Partial inbox handler

This commit is contained in:
Riley Apeldoorn 2024-05-03 23:44:01 +02:00
parent 288c181cc9
commit 8f2ea89301
12 changed files with 361 additions and 70 deletions

View file

@ -1,7 +1,12 @@
//! Control program for the ActivityPub federated social media server. //! Control program for the ActivityPub federated social media server.
#![feature(iterator_try_collect)] #![feature(iterator_try_collect)]
use puppy::{actor::Actor, config::Config, Context}; use puppy::{
actor::Actor,
config::Config,
data::{FollowRequest, Object, Profile},
Context,
};
#[tokio::main] #[tokio::main]
async fn main() -> puppy::Result<()> { async fn main() -> puppy::Result<()> {
@ -14,8 +19,19 @@ async fn main() -> puppy::Result<()> {
}; };
let cx = Context::load(config)?; let cx = Context::load(config)?;
let riley = get_or_create_actor(&cx, "riley")?; let riley = get_or_create_actor(&cx, "riley")?;
let post = puppy::post::create_post(&cx, riley.key, "i like boys")?; cx.run(|tx| {
puppy::post::federate_post(&cx, post).await println!("\nRiley's following:");
for FollowRequest { id, origin, .. } in
riley.pending_requests(&tx).try_collect::<Vec<_>>()?
{
let Profile { account_name, .. } = tx.get_mixin(origin)?.unwrap();
let Object { id, .. } = tx.get_mixin(id)?.unwrap();
println!("- @{account_name} ({origin}) (request url = {id})");
}
Ok(())
})
// let post = puppy::post::create_post(&cx, riley.key, "i like boys")?;
// puppy::post::federate_post(&cx, post).await
// let linen = get_or_create_actor(&cx, "linen")?; // let linen = get_or_create_actor(&cx, "linen")?;
// if true { // if true {

View file

@ -1,6 +1,6 @@
//! API endpoints and request handlers. //! API endpoints and request handlers.
use std::convert::Infallible; use std::{convert::Infallible, future::Future};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -164,6 +164,7 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result<Respon
}; };
// Simplified representation of a request, so we can pattern match on it more easily in the dispatchers. // Simplified representation of a request, so we can pattern match on it more easily in the dispatchers.
let req = Req::simplify(&request); let req = Req::simplify(&request);
println!("{} {}", request.method(), request.uri());
// We'll use the path to pick where specifically to send the request. // We'll use the path to pick where specifically to send the request.
// Check request signature at the door. Even if it isn't needed for a particular endpoint, failing fast // Check request signature at the door. Even if it isn't needed for a particular endpoint, failing fast
// with a clear error message will save anyone trying to get *their* signatures implementation a major // with a clear error message will save anyone trying to get *their* signatures implementation a major
@ -185,7 +186,17 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result<Respon
}; };
// If one of the endpoints gave us an error message, we convert that into a response and then // If one of the endpoints gave us an error message, we convert that into a response and then
// serve it to the client. In either case, we just serve a response. // serve it to the client. In either case, we just serve a response.
Ok(res.unwrap_or_else(|msg| req.error(msg))) let response = res.unwrap_or_else(|msg| {
println!("{} {}: [error] {msg}", request.method(), request.uri());
req.error(msg)
});
println! {
"{} {}: {}",
request.method(),
request.uri(),
response.status()
};
Ok(response)
} }
const POST: &Method = &Method::POST; const POST: &Method = &Method::POST;
@ -208,11 +219,9 @@ async fn dispatch_signed(
// verification actor lives. // verification actor lives.
(GET, ["o", ulid]) => ap::serve_object(&cx, ulid), (GET, ["o", ulid]) => ap::serve_object(&cx, ulid),
// POSTs to an actor's inbox need to be signed to prevent impersonation. // POSTs to an actor's inbox need to be signed to prevent impersonation.
(POST, ["o", ulid, "inbox"]) => with_json(&req.body, |json| try { (POST, ["o", ulid, "inbox"]) => {
// We only handle the intermediate parsing of the json, full resolution of the with_json(&req.body, |json| ap::inbox(&cx, ulid, sig, json)).await
// activity object will happen inside the inbox handler itself. }
ap::inbox(&cx, ulid, sig, json)
}),
// Try the resources for which no signature is required as well. // Try the resources for which no signature is required as well.
_ => dispatch_public(cx, verifier, req).await, _ => dispatch_public(cx, verifier, req).await,
} }
@ -236,12 +245,12 @@ async fn dispatch_public(
} }
} }
fn with_json( async fn with_json<F>(body: &[u8], f: impl FnOnce(Value) -> F) -> Result<Response, Message>
body: &[u8], where
f: impl FnOnce(Value) -> Result<Response, Message>, F: Future<Output = Result<Response, Message>>,
) -> Result<Response, Message> { {
match from_slice(body) { match from_slice(body) {
Ok(json) => f(json), Ok(json) => f(json).await,
Err(e) => fuck!(400: "could not decode json: {e}"), Err(e) => fuck!(400: "could not decode json: {e}"),
} }
} }
@ -262,6 +271,12 @@ mod error {
pub status: u16, pub status: u16,
} }
impl std::fmt::Display for Message {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.error.fmt(f)
}
}
impl super::Req<'_> { impl super::Req<'_> {
/// Generate an error response for the request. /// Generate an error response for the request.
pub fn error(&self, err: Message) -> Response { pub fn error(&self, err: Message) -> Response {

View file

@ -4,6 +4,7 @@ use http_body_util::Full;
use hyper::body::Bytes; use hyper::body::Bytes;
use puppy::{ use puppy::{
actor::{get_signing_key, Actor}, actor::{get_signing_key, Actor},
fetch::object::Activity,
get_local_ap_object, Context, Error, Key, get_local_ap_object, Context, Error, Key,
}; };
use serde_json::Value; use serde_json::Value;
@ -62,8 +63,23 @@ pub async fn outbox(cx: &Context, params: &[(&str, &str)]) -> Result<Response, M
} }
/// Handle POSTs to actor inboxes. Requires request signature. /// Handle POSTs to actor inboxes. Requires request signature.
pub fn inbox(cx: &Context, actor_id: &str, sig: Signer, body: Value) -> Response { pub async fn inbox(
todo!() cx: &Context,
actor_id: &str,
sig: Signer,
body: Value,
) -> Result<Response, Message> {
let receiver = actor_id.parse::<Key>().unwrap();
match Activity::from_json(body) {
Ok(activity) => {
puppy::ingest(&cx, receiver, &activity).await.unwrap();
match puppy::interpret(&cx, activity) {
Ok(_) => Ok(respond!(code: 202)),
Err(err) => fuck!(400: "error interpreting activity: {err}"),
}
}
Err(err) => fuck!(400: "invalid payload: {err}"),
}
} }
/// Serve an ActivityPub object as json-ld. /// Serve an ActivityPub object as json-ld.

View file

@ -23,6 +23,7 @@ pub const VERSION: &str = "0.0.1-dev";
pub const ACTIVITYPUB_TYPE: &str = "application/activity+json"; pub const ACTIVITYPUB_TYPE: &str = "application/activity+json";
/// A client for sending ActivityPub and WebFinger requests with. /// A client for sending ActivityPub and WebFinger requests with.
#[derive(Clone)]
pub struct Client { pub struct Client {
inner: reqwest::Client, inner: reqwest::Client,
} }
@ -53,12 +54,8 @@ impl Client {
.expect("signature generation to work") .expect("signature generation to work")
.commit(&mut req); .commit(&mut req);
let req = dbg!(req);
let request = req.map(Body::from).try_into().unwrap(); let request = req.map(Body::from).try_into().unwrap();
let response = self.inner.execute(request).await.unwrap(); self.inner.execute(request).await.unwrap();
let body = dbg!(response).text().await.unwrap();
dbg!(body);
} }
/// A high-level function to resolve a single ActivityPub ID using a signed request. /// A high-level function to resolve a single ActivityPub ID using a signed request.
pub async fn resolve(&self, key: &SigningKey, url: &str) -> Result<Value, FetchError> { pub async fn resolve(&self, key: &SigningKey, url: &str) -> Result<Value, FetchError> {
@ -103,7 +100,6 @@ impl Client {
.body(()) .body(())
.unwrap(); .unwrap();
println!("[{system}]: using modern config");
key.sign(Options::LEGACY, &req) key.sign(Options::LEGACY, &req)
.expect("signing error") .expect("signing error")
.commit(&mut req); .commit(&mut req);
@ -130,13 +126,13 @@ enum Subsystem {
/// The subsystem that dereferences ActivityPub URLs to JSON values. /// The subsystem that dereferences ActivityPub URLs to JSON values.
/// ///
/// In addition, the resolver is used for resolving webfinger handles to ActivityPub actors. /// In addition, the resolver is used for resolving webfinger handles to ActivityPub actors.
#[display = "resolver"] #[display(fmt = "resolver")]
Resolver, Resolver,
/// The subsystem responsible for delivering activities to inboxes. /// The subsystem responsible for delivering activities to inboxes.
#[display = "delivery"] #[display(fmt = "delivery")]
Delivery, Delivery,
/// For testing the resolver and signatures. /// For testing the resolver and signatures.
#[display = "devproxy"] #[display(fmt = "devproxy")]
DevProxy, DevProxy,
} }

View file

@ -16,7 +16,7 @@ pub use client::Client;
mod client; mod client;
/// Deliver an activity to an inbox. /// Deliver an activity to an inbox.
pub async fn deliver(key: &SigningKey, activity: Activity, inbox: &str) { pub async fn deliver(key: &SigningKey, activity: &Activity, inbox: &str) {
Client::new().deliver(key, &activity, inbox).await Client::new().deliver(key, &activity, inbox).await
} }
@ -55,6 +55,9 @@ pub enum FetchError {
/// message produced by the JSON deserializer. /// message produced by the JSON deserializer.
#[display(fmt = "deserialization error: {}", self.0)] #[display(fmt = "deserialization error: {}", self.0)]
BadJson(String), BadJson(String),
/// A JSON-LD document could not be deserialized because it does not conform to our expectations.
#[display(fmt = "parsing error: {}", self.0)]
BadObject(String),
/// An error that occurred while generating a signature for a a request. /// An error that occurred while generating a signature for a a request.
#[display(fmt = "signing error: {}", self.0)] #[display(fmt = "signing error: {}", self.0)]
Sig(String), Sig(String),

View file

@ -1,6 +1,7 @@
//! ActivityPub vocabulary as interpreted by ActivityPuppy. //! ActivityPub vocabulary as interpreted by ActivityPuppy.
use serde_json::{json, Value}; use serde_json::{json, Value};
use derive_more::From;
pub use crate::signatures::Key as PublicKey; pub use crate::signatures::Key as PublicKey;
@ -11,11 +12,11 @@ pub struct Activity<T = String> {
pub kind: T, pub kind: T,
} }
impl<K> Activity<K> impl<K> Activity<K> {
where pub fn to_json_ld(&self) -> Value
K: ToString, where
{ K: ToString,
pub fn to_json_ld(&self) -> Value { {
json!({ json!({
"@context": [ "@context": [
"https://www.w3.org/ns/activitystreams", "https://www.w3.org/ns/activitystreams",
@ -28,6 +29,33 @@ where
}) })
} }
} }
impl Activity {
pub fn from_json(mut json: Value) -> Result<Activity, String> {
let Some(map) = json.as_object() else {
do yeet "expected an object"
};
let Some(id) = map.get("id").and_then(|s| s.as_str()).map(str::to_owned) else {
do yeet "missing `id` property"
};
let Some(actor) = map.get("actor").and_then(|s| s.as_str()).map(str::to_owned) else {
do yeet format!("missing `actor` property for activity {id}")
};
let Some(kind) = map.get("type").and_then(|s| s.as_str()).map(str::to_owned) else {
do yeet format!("missing `type` property for activity {id}")
};
// TODO: make this behave gracefully when we only get an ID.
let Some(object) = json
.get_mut("object")
.map(Value::take)
.map(Object::from_json)
.transpose()?
.map(Box::new)
else {
do yeet format!("missing or invalid `object` property for activity {id}")
};
Ok(Activity { id, actor, object, kind })
}
}
/// An actor is an entity capable of producing Takes. /// An actor is an entity capable of producing Takes.
pub struct Actor { pub struct Actor {
@ -64,11 +92,48 @@ impl Actor {
} }
}) })
} }
pub fn from_json(json: Value) -> Result<Actor, String> {
let Value::Object(map) = json else {
do yeet format!("expected json object")
};
Ok(Actor {
id: map
.get("id")
.ok_or("id is required")?
.as_str()
.ok_or("id must be a str")?
.to_string(),
inbox: map
.get("inbox")
.ok_or("inbox is required")?
.as_str()
.ok_or("inbox must be a str")?
.to_string(),
account_name: map
.get("preferredUsername")
.ok_or("preferredUsername is required")?
.as_str()
.ok_or("preferredUsername must be a str")?
.to_string(),
display_name: map.get("name").and_then(|v| v.as_str()).map(str::to_owned),
public_key: map
.get("publicKey")
.cloned()
.and_then(PublicKey::from_json)
.ok_or("publicKey property could not be parsed")?,
})
}
} }
#[derive(From)]
pub enum Object { pub enum Object {
#[from(ignore)]
Id {
id: String,
},
Activity(Activity), Activity(Activity),
Actor(Actor), Actor(Actor),
#[from(ignore)]
Other { Other {
id: String, id: String,
kind: String, kind: String,
@ -84,10 +149,52 @@ impl Object {
Object::Activity(a) => &a.id, Object::Activity(a) => &a.id,
Object::Actor(a) => &a.id, Object::Actor(a) => &a.id,
Object::Other { id, .. } => id, Object::Other { id, .. } => id,
Object::Id { id } => id,
}
}
pub fn from_json(json: Value) -> Result<Object, String> {
if let Value::String(id) = json {
Ok(Object::Id { id })
} else if let Value::Object(ref map) = json {
match map.get("type").and_then(Value::as_str) {
Some("System" | "Application" | "Person" | "Service") => {
Actor::from_json(json).map(Object::Actor)
}
Some("Create" | "Follow" | "Accept" | "Reject" | "Bite") => {
Activity::from_json(json).map(Object::Activity)
}
Some(kind) => Ok(Object::Other {
id: map
.get("id")
.ok_or("id is required")?
.as_str()
.ok_or("id must be a str")?
.to_string(),
kind: kind.to_string(),
author: map
.get("attributedTo")
.ok_or("attributedTo is required")?
.as_str()
.ok_or("attributedTo must be a str")?
.to_string(),
content: map
.get("content")
.and_then(|v| v.as_str())
.map(str::to_owned),
summary: map
.get("summary")
.and_then(|v| v.as_str())
.map(str::to_owned),
}),
None => do yeet "could not determine type of object",
}
} else {
Err(format!("expected a json object or an id, got {json:#?}"))
} }
} }
pub fn to_json_ld(&self) -> Value { pub fn to_json_ld(&self) -> Value {
match self { match self {
Object::Id { id } => json!(id),
Object::Activity(a) => a.to_json_ld(), Object::Activity(a) => a.to_json_ld(),
Object::Actor(a) => a.to_json_ld(), Object::Actor(a) => a.to_json_ld(),
Object::Other { Object::Other {

View file

@ -649,8 +649,6 @@ mod new {
.filter_map(|(k, v)| v.strip_prefix('"')?.strip_suffix('"').map(|v| (k, v))) .filter_map(|(k, v)| v.strip_prefix('"')?.strip_suffix('"').map(|v| (k, v)))
.collect(); .collect();
let table = dbg!(table);
let Some(headers) = get(&table, "headers") else { let Some(headers) = get(&table, "headers") else {
do yeet "Missing `headers` field"; do yeet "Missing `headers` field";
}; };

View file

@ -1,3 +1,4 @@
use fetch::Client;
use store::{Key, Store, Transaction}; use store::{Key, Store, Transaction};
use crate::{config::Config, Result}; use crate::{config::Config, Result};
@ -8,17 +9,16 @@ use crate::{config::Config, Result};
#[derive(Clone)] #[derive(Clone)]
pub struct Context { pub struct Context {
config: Config, config: Config,
client: Client,
store: Store, store: Store,
} }
impl Context { impl Context {
fn new(config: Config, store: Store) -> Context {
Context { config, store }
}
/// Load the server context from the configuration. /// Load the server context from the configuration.
pub fn load(config: Config) -> Result<Context> { pub fn load(config: Config) -> Result<Context> {
let store = Store::open(&config.state_dir, crate::data::schema())?; let store = Store::open(&config.state_dir, crate::data::schema())?;
Ok(Context { config, store }) let client = Client::new();
Ok(Context { config, store, client })
} }
/// Do a data store [transaction][store::Transaction]. /// Do a data store [transaction][store::Transaction].
pub fn run<T>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T>) -> Result<T> { pub fn run<T>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T>) -> Result<T> {
@ -36,6 +36,10 @@ impl Context {
pub fn mk_url(&self, key: Key) -> String { pub fn mk_url(&self, key: Key) -> String {
format!("https://{}/o/{key}", self.config.ap_domain) format!("https://{}/o/{key}", self.config.ap_domain)
} }
/// Access the federation client.
pub fn resolver(&self) -> &Client {
&self.client
}
} }
/// Load a context for running tests in. /// Load a context for running tests in.
@ -45,5 +49,6 @@ pub fn test_context<T>(
schema: store::types::Schema, schema: store::types::Schema,
test: impl FnOnce(Context) -> Result<T>, test: impl FnOnce(Context) -> Result<T>,
) -> Result<T> { ) -> Result<T> {
Store::test(schema, |store| test(Context { config, store })) let client = Client::new();
Store::test(schema, |store| test(Context { config, store, client }))
} }

View file

@ -11,7 +11,7 @@ use crate::{
/// Interactions with other objects. /// Interactions with other objects.
impl Actor { impl Actor {
/// Create a [`Bite`]. /// Create a [`Bite`].
pub fn bite(&self, victim: &Actor) -> Bite { pub fn bite(&self, victim: Actor) -> Bite {
Bite { Bite {
victim: victim.key, victim: victim.key,
biter: self.key, biter: self.key,
@ -19,7 +19,7 @@ impl Actor {
} }
} }
/// Construct a [`FollowRequest`]. /// Construct a [`FollowRequest`].
pub fn follow_request(&self, target: &Actor) -> FollowRequest { pub fn follow_request(&self, target: Actor) -> FollowRequest {
FollowRequest { FollowRequest {
origin: self.key, origin: self.key,
target: target.key, target: target.key,
@ -27,13 +27,13 @@ impl Actor {
} }
} }
/// Makes `biter` bite `victim` and inserts the records into the database. /// Makes `biter` bite `victim` and inserts the records into the database.
pub fn do_bite(&self, cx: &Context, victim: &Actor) -> Result<Bite> { pub fn do_bite(&self, cx: &Context, victim: Actor) -> Result<Bite> {
let bite = self.bite(victim); let bite = self.bite(victim);
cx.run(|tx| try { tx.create(bite) })?; cx.run(|tx| tx.create(bite).map_err(Error::Store))?;
Ok(bite) Ok(bite)
} }
/// Creates a follow request from `self` to `target`. /// Creates a follow request from `self` to `target`.
pub fn do_follow_request(&self, cx: &Context, target: &Actor) -> Result<FollowRequest> { pub fn do_follow_request(&self, cx: &Context, target: Actor) -> Result<FollowRequest> {
let req = self.follow_request(target); let req = self.follow_request(target);
cx.run(|tx| { cx.run(|tx| {
tx.create(req)?; tx.create(req)?;
@ -70,7 +70,7 @@ impl Actor {
self.key == req.target, self.key == req.target,
"only the target of a follow request may accept it" "only the target of a follow request may accept it"
}; };
cx.run(|tx| try { tx.update(req.id, |_| Status::Rejected) })?; cx.run(|tx| try { tx.update(req.id, |_| Status::Rejected)? })?;
Ok(()) Ok(())
} }
/// Get all pending follow request for `self`. /// Get all pending follow request for `self`.
@ -156,7 +156,7 @@ mod tests {
fn create_fr() -> Result<()> { fn create_fr() -> Result<()> {
test_context(test_config(), schema(), |cx| { test_context(test_config(), schema(), |cx| {
let (alice, bob) = make_test_actors(&cx)?; let (alice, bob) = make_test_actors(&cx)?;
alice.do_follow_request(&cx, &bob)?; alice.do_follow_request(&cx, bob)?;
assert!( assert!(
cx.store().exists::<FollowRequest>(alice.key, bob.key)?, cx.store().exists::<FollowRequest>(alice.key, bob.key)?,
"(alice -> bob) ∈ follow-requested" "(alice -> bob) ∈ follow-requested"
@ -180,7 +180,7 @@ mod tests {
test_context(test_config(), schema(), |cx| { test_context(test_config(), schema(), |cx| {
let db = cx.store(); let db = cx.store();
let (alice, bob) = make_test_actors(&cx)?; let (alice, bob) = make_test_actors(&cx)?;
let req = alice.do_follow_request(&cx, &bob)?; let req = alice.do_follow_request(&cx, bob)?;
bob.do_accept_request(&cx, req)?; bob.do_accept_request(&cx, req)?;
assert!( assert!(
@ -210,7 +210,7 @@ mod tests {
fn listing_follow_relations() -> Result<()> { fn listing_follow_relations() -> Result<()> {
test_context(test_config(), schema(), |cx| try { test_context(test_config(), schema(), |cx| try {
let (alice, bob) = make_test_actors(&cx)?; let (alice, bob) = make_test_actors(&cx)?;
let req = alice.do_follow_request(&cx, &bob)?; let req = alice.do_follow_request(&cx, bob)?;
bob.do_accept_request(&cx, req)?; bob.do_accept_request(&cx, req)?;
cx.run(|tx| try { cx.run(|tx| try {

View file

@ -12,14 +12,16 @@
// but that would make every type signature ever 100x more complicated, so we're not doing it. // but that would make every type signature ever 100x more complicated, so we're not doing it.
#![deny(clippy::disallowed_methods, clippy::disallowed_types)] #![deny(clippy::disallowed_methods, clippy::disallowed_types)]
use std::hint::unreachable_unchecked;
use actor::{get_signing_key, Actor};
pub use context::Context; pub use context::Context;
#[cfg(test)] #[cfg(test)]
pub use context::test_context; pub use context::test_context;
use data::{ use data::{ActivityKind, AuthorOf, Channel, Content, Create, Id, ObjectKind, Profile, PublicKey};
ActivityKind, AuthorOf, Channel, Content, Create, Id, Object, ObjectKind, Profile, PublicKey,
};
use fetch::object::{Activity, Object};
use store::Transaction; use store::Transaction;
pub use store::{self, Key, StoreError}; pub use store::{self, Key, StoreError};
pub use fetch::{self, FetchError}; pub use fetch::{self, FetchError};
@ -30,11 +32,13 @@ pub mod data;
pub mod post; pub mod post;
mod interact; mod interact;
use derive_more::{From, Display};
/// Retrieve an ActivityPub object from the database. /// Retrieve an ActivityPub object from the database.
/// ///
/// Fails with `Error::Missing` if the required properties are not present. /// Fails with `Error::Missing` if the required properties are not present.
pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result<fetch::object::Object> { pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result<fetch::object::Object> {
let Some(obj) = tx.get_mixin::<Object>(key)? else { let Some(obj) = tx.get_mixin::<data::Object>(key)? else {
// We need this data in order to determine the object type. If the passed key does not // We need this data in order to determine the object type. If the passed key does not
// have this data, it must not be an ActivityPub object. // have this data, it must not be an ActivityPub object.
return Err(Error::MissingData { node: key, prop: "Object" }); return Err(Error::MissingData { node: key, prop: "Object" });
@ -97,7 +101,10 @@ pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result<fetch::obje
} }
pub mod actor { pub mod actor {
use fetch::signatures::{Private, SigningKey}; use fetch::{
object,
signatures::{Private, SigningKey},
};
use store::{Key, StoreError, Transaction}; use store::{Key, StoreError, Transaction};
use crate::{ use crate::{
@ -106,7 +113,7 @@ pub mod actor {
}; };
/// A reference to an actor. /// A reference to an actor.
#[derive(Clone, Eq, PartialEq, Debug)] #[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct Actor { pub struct Actor {
/// The key identifying the actor in the data store. /// The key identifying the actor in the data store.
pub key: Key, pub key: Key,
@ -118,7 +125,6 @@ pub mod actor {
let maybe_key = tx let maybe_key = tx
.lookup(Username(username.to_string())) .lookup(Username(username.to_string()))
.map_err(Error::Store)?; .map_err(Error::Store)?;
// For now, we only have local actors.
Ok(maybe_key.map(|key| Actor { key })) Ok(maybe_key.map(|key| Actor { key }))
} }
} }
@ -144,7 +150,33 @@ pub mod actor {
}) })
} }
/// Add properties related to ActivityPub actors to a vertex. /// Register an actor from another server.
pub fn create_remote(cx: &Context, object: object::Actor) -> Result<Actor> {
let key = Key::gen();
cx.run(|tx| {
tx.add_alias(key, Id(object.id.clone()))?;
tx.add_mixin(key, Channel { inbox: object.inbox })?;
tx.add_mixin(key, Object {
kind: ObjectKind::Actor,
id: Id(object.id),
local: false,
})?;
tx.add_mixin(key, Profile {
post_count: 0,
account_name: Username(object.account_name),
display_name: object.display_name,
about_string: None,
about_fields: Vec::new(),
})?;
tx.add_mixin(key, PublicKey {
key_id: object.public_key.id,
key_pem: object.public_key.inner,
})?;
Ok(Actor { key })
})
}
/// Add properties related to local ActivityPub actors to a vertex.
pub fn mixin_ap_actor( pub fn mixin_ap_actor(
tx: &Transaction<'_>, tx: &Transaction<'_>,
vertex: Key, vertex: Key,
@ -189,13 +221,16 @@ pub mod actor {
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(derive_more::From, Debug)] #[derive(From, Debug, Display)]
pub enum Error { pub enum Error {
/// An error internal to the store. /// An error internal to the store.
#[display(fmt = "{}", self.0)]
Store(StoreError), Store(StoreError),
/// An error generated by the [fetch] subsystem. /// An error generated by the [fetch] subsystem.
#[display(fmt = "{}", self.0)]
Fetch(FetchError), Fetch(FetchError),
/// Expected `node` to have some property that it doesn't have. /// Expected `node` to have some property that it doesn't have.
#[display(fmt = "missing data: {node} is missing {prop}")]
MissingData { MissingData {
/// The node that is missing the data. /// The node that is missing the data.
node: Key, node: Key,
@ -213,3 +248,99 @@ pub mod config {
pub port: u16, pub port: u16,
} }
} }
/// Interpret an *incoming* activity. Outgoing activities are *never* interpreted through this function,
/// because their changes are already in the database.
// TODO: figure out if that is the behavior we actually want
pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
// Fetch our actor from the database
let Some(actor) = cx.store().lookup(Id(activity.actor.clone()))? else {
panic!(
"actor {} does not exist in the database (id={})",
activity.actor, activity.id
)
};
// Fetch our object from the database. The object must already exist in the database.
let id = activity.object.id();
let Some(object) = cx.store().lookup(Id(id.to_owned()))? else {
panic!(
"object {} does not exist in the database (id={})",
activity.object.id(),
activity.id
)
};
let actor = actor::Actor { key: actor };
let (key, tag) = match activity.kind.as_str() {
"Bite" => {
let object = actor::Actor { key: object };
(actor.do_bite(&cx, object)?.id, ActivityKind::Bite)
}
"Create" => {
todo!()
}
"Follow" => {
let object = actor::Actor { key: object };
let req = actor.do_follow_request(&cx, object)?;
(req.id, ActivityKind::Follow)
}
tag @ ("Accept" | "Reject") => {
// Follow requests are multi-arrows in our graph, and they have their own activitypub id.
let Some(req) = cx.store().get_arrow(object)? else {
panic!(
"follow request does not exist: {object} (id={})",
activity.id
)
};
// Dispatch to the actual method based on the tag
let tag = match tag {
"Accept" => actor
.do_accept_request(&cx, req)
.map(|_| ActivityKind::Accept)?,
"Reject" => actor
.do_reject_request(&cx, req)
.map(|_| ActivityKind::Reject)?,
_ => unsafe {
// SAFETY: this branch of the outer match only matches if the tag is either "Accept" or "Reject",
// so this inner branch is truly unreachable.
unreachable_unchecked()
},
};
(Key::gen(), tag)
}
k => panic!("unsupported activity type {k} (id={})", activity.id),
};
cx.run(|tx| {
tx.add_alias(key, Id(activity.id.clone()))?;
tx.add_mixin(key, data::Object {
id: Id(activity.id.clone()),
kind: ObjectKind::Activity(tag),
local: false,
})?;
Ok(())
})
}
/// Make sure all the interesting bits of an activity are here.
pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()> {
let key = cx.run(|tx| get_signing_key(tx, actor::Actor { key: auth }).map_err(Error::Store))?;
for id in [activity.actor.as_str(), activity.object.id()] {
if cx.store().lookup(Id(id.to_owned()))?.is_some() {
// Skip ingesting if we already know this ID.
continue;
}
let json = cx.resolver().resolve(&key, &id).await?;
let object = Object::from_json(json).unwrap();
match object {
Object::Activity(a) => interpret(&cx, a)?,
Object::Actor(a) => actor::create_remote(cx, a).map(void)?,
_ => todo!(),
}
}
Ok(())
}
/// Discard the argument.
fn void<T>(_: T) -> () {}

View file

@ -13,8 +13,8 @@ use store::{util::IterExt as _, Key, Store, StoreError, Transaction};
use crate::{ use crate::{
actor::{get_signing_key, Actor}, actor::{get_signing_key, Actor},
data::{ data::{
self, ActivityKind, AuthorOf, Content, Create, Id, ObjectKind, PrivateKey, Profile, self, ActivityKind, AuthorOf, Channel, Content, Create, Follows, Id, ObjectKind,
PublicKey, PrivateKey, Profile, PublicKey,
}, },
Context, Error, Context, Error,
}; };
@ -153,13 +153,14 @@ pub fn create_post(cx: &Context, author: Key, content: impl Into<Content>) -> cr
pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> { pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> {
// Obtain all the data we need to construct our activity // Obtain all the data we need to construct our activity
let (Content { content, warning }, url, author, signing_key) = cx.run(|tx| try { let (Content { content, warning }, url, author, signing_key, followers) = cx.run(|tx| try {
let Some(AuthorOf { author, .. }) = tx.incoming(post.key).next().transpose()? else { let Some(AuthorOf { author, .. }) = tx.incoming(post.key).next().transpose()? else {
panic!() panic!("can't federate post without author: {post:?}")
}; };
let signing_key = get_signing_key(tx, Actor { key: author })?; let signing_key = get_signing_key(tx, Actor { key: author })?;
let (c, data::Object { id, .. }) = tx.get_mixin_many(post.key)?; let (c, data::Object { id, .. }) = tx.get_mixin_many(post.key)?;
(c, id, author, signing_key) let targets = tx.join_on::<Channel, _>(|a| a.follower, tx.incoming::<Follows>(author))?;
(c, id, author, signing_key, targets)
})?; })?;
let activity_key = Key::gen(); let activity_key = Key::gen();
@ -194,12 +195,15 @@ pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> {
kind: "Create".to_string(), kind: "Create".to_string(),
}; };
fetch::deliver( for inbox in followers
&signing_key, .into_iter()
activity, .filter_map(|(_, c)| c.map(|t| t.inbox))
"https://crimew.gay/users/ezri/inbox", // FIXME: remove this when im done testing
) .chain(["https://crimew.gay/users/riley/inbox".to_string()])
.await; {
fetch::deliver(&signing_key, &activity, &inbox).await;
}
Ok(()) Ok(())
} }

View file

@ -15,7 +15,7 @@
use std::{cell::RefCell, path::Path, sync::Arc}; use std::{cell::RefCell, path::Path, sync::Arc};
use derive_more::From; use derive_more::{From, Display};
use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction}; use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction};
use types::Schema; use types::Schema;
@ -139,7 +139,7 @@ pub const OK: Result<()> = Ok(());
pub type Result<T, E = StoreError> = std::result::Result<T, E>; pub type Result<T, E = StoreError> = std::result::Result<T, E>;
/// Errors from the data store. /// Errors from the data store.
#[derive(From, Debug)] #[derive(From, Display, Debug)]
pub enum StoreError { pub enum StoreError {
/// The requested value was expected to exist in a particular keyspace, but does not actually /// The requested value was expected to exist in a particular keyspace, but does not actually
/// exist there. This can occur on updates for example. /// exist there. This can occur on updates for example.