From 8f2ea8930182817511c614dc1fba689d0707ecd4 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Fri, 3 May 2024 23:44:01 +0200 Subject: [PATCH] Partial inbox handler --- bin/pupctl/src/main.rs | 22 +++++- bin/server/src/api.rs | 39 +++++++--- bin/server/src/api/ap.rs | 20 ++++- lib/fetch/src/client.rs | 14 ++-- lib/fetch/src/lib.rs | 5 +- lib/fetch/src/object.rs | 117 ++++++++++++++++++++++++++-- lib/fetch/src/signatures.rs | 2 - lib/puppy/src/context.rs | 15 ++-- lib/puppy/src/interact.rs | 18 ++--- lib/puppy/src/lib.rs | 149 +++++++++++++++++++++++++++++++++--- lib/puppy/src/post.rs | 26 ++++--- lib/store/src/lib.rs | 4 +- 12 files changed, 361 insertions(+), 70 deletions(-) diff --git a/bin/pupctl/src/main.rs b/bin/pupctl/src/main.rs index 9813536..f103664 100644 --- a/bin/pupctl/src/main.rs +++ b/bin/pupctl/src/main.rs @@ -1,7 +1,12 @@ //! Control program for the ActivityPub federated social media server. #![feature(iterator_try_collect)] -use puppy::{actor::Actor, config::Config, Context}; +use puppy::{ + actor::Actor, + config::Config, + data::{FollowRequest, Object, Profile}, + Context, +}; #[tokio::main] async fn main() -> puppy::Result<()> { @@ -14,8 +19,19 @@ async fn main() -> puppy::Result<()> { }; let cx = Context::load(config)?; let riley = get_or_create_actor(&cx, "riley")?; - let post = puppy::post::create_post(&cx, riley.key, "i like boys")?; - puppy::post::federate_post(&cx, post).await + cx.run(|tx| { + println!("\nRiley's following:"); + for FollowRequest { id, origin, .. } in + riley.pending_requests(&tx).try_collect::>()? + { + let Profile { account_name, .. } = tx.get_mixin(origin)?.unwrap(); + let Object { id, .. } = tx.get_mixin(id)?.unwrap(); + println!("- @{account_name} ({origin}) (request url = {id})"); + } + Ok(()) + }) + // let post = puppy::post::create_post(&cx, riley.key, "i like boys")?; + // puppy::post::federate_post(&cx, post).await // let linen = get_or_create_actor(&cx, "linen")?; // if true { diff --git a/bin/server/src/api.rs b/bin/server/src/api.rs index 744e8a0..7b68d42 100644 --- a/bin/server/src/api.rs +++ b/bin/server/src/api.rs @@ -1,6 +1,6 @@ //! API endpoints and request handlers. -use std::convert::Infallible; +use std::{convert::Infallible, future::Future}; use std::net::SocketAddr; use std::sync::Arc; @@ -164,6 +164,7 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result Result ap::serve_object(&cx, ulid), // POSTs to an actor's inbox need to be signed to prevent impersonation. - (POST, ["o", ulid, "inbox"]) => with_json(&req.body, |json| try { - // We only handle the intermediate parsing of the json, full resolution of the - // activity object will happen inside the inbox handler itself. - ap::inbox(&cx, ulid, sig, json) - }), + (POST, ["o", ulid, "inbox"]) => { + with_json(&req.body, |json| ap::inbox(&cx, ulid, sig, json)).await + } // Try the resources for which no signature is required as well. _ => dispatch_public(cx, verifier, req).await, } @@ -236,12 +245,12 @@ async fn dispatch_public( } } -fn with_json( - body: &[u8], - f: impl FnOnce(Value) -> Result, -) -> Result { +async fn with_json(body: &[u8], f: impl FnOnce(Value) -> F) -> Result +where + F: Future>, +{ match from_slice(body) { - Ok(json) => f(json), + Ok(json) => f(json).await, Err(e) => fuck!(400: "could not decode json: {e}"), } } @@ -262,6 +271,12 @@ mod error { pub status: u16, } + impl std::fmt::Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.error.fmt(f) + } + } + impl super::Req<'_> { /// Generate an error response for the request. pub fn error(&self, err: Message) -> Response { diff --git a/bin/server/src/api/ap.rs b/bin/server/src/api/ap.rs index 9575fe3..cb35236 100644 --- a/bin/server/src/api/ap.rs +++ b/bin/server/src/api/ap.rs @@ -4,6 +4,7 @@ use http_body_util::Full; use hyper::body::Bytes; use puppy::{ actor::{get_signing_key, Actor}, + fetch::object::Activity, get_local_ap_object, Context, Error, Key, }; use serde_json::Value; @@ -62,8 +63,23 @@ pub async fn outbox(cx: &Context, params: &[(&str, &str)]) -> Result Response { - todo!() +pub async fn inbox( + cx: &Context, + actor_id: &str, + sig: Signer, + body: Value, +) -> Result { + let receiver = actor_id.parse::().unwrap(); + match Activity::from_json(body) { + Ok(activity) => { + puppy::ingest(&cx, receiver, &activity).await.unwrap(); + match puppy::interpret(&cx, activity) { + Ok(_) => Ok(respond!(code: 202)), + Err(err) => fuck!(400: "error interpreting activity: {err}"), + } + } + Err(err) => fuck!(400: "invalid payload: {err}"), + } } /// Serve an ActivityPub object as json-ld. diff --git a/lib/fetch/src/client.rs b/lib/fetch/src/client.rs index e209e1d..d7e2e5c 100644 --- a/lib/fetch/src/client.rs +++ b/lib/fetch/src/client.rs @@ -23,6 +23,7 @@ pub const VERSION: &str = "0.0.1-dev"; pub const ACTIVITYPUB_TYPE: &str = "application/activity+json"; /// A client for sending ActivityPub and WebFinger requests with. +#[derive(Clone)] pub struct Client { inner: reqwest::Client, } @@ -53,12 +54,8 @@ impl Client { .expect("signature generation to work") .commit(&mut req); - let req = dbg!(req); - let request = req.map(Body::from).try_into().unwrap(); - let response = self.inner.execute(request).await.unwrap(); - let body = dbg!(response).text().await.unwrap(); - dbg!(body); + self.inner.execute(request).await.unwrap(); } /// A high-level function to resolve a single ActivityPub ID using a signed request. pub async fn resolve(&self, key: &SigningKey, url: &str) -> Result { @@ -103,7 +100,6 @@ impl Client { .body(()) .unwrap(); - println!("[{system}]: using modern config"); key.sign(Options::LEGACY, &req) .expect("signing error") .commit(&mut req); @@ -130,13 +126,13 @@ enum Subsystem { /// The subsystem that dereferences ActivityPub URLs to JSON values. /// /// In addition, the resolver is used for resolving webfinger handles to ActivityPub actors. - #[display = "resolver"] + #[display(fmt = "resolver")] Resolver, /// The subsystem responsible for delivering activities to inboxes. - #[display = "delivery"] + #[display(fmt = "delivery")] Delivery, /// For testing the resolver and signatures. - #[display = "devproxy"] + #[display(fmt = "devproxy")] DevProxy, } diff --git a/lib/fetch/src/lib.rs b/lib/fetch/src/lib.rs index 0a129c8..8c2bed6 100644 --- a/lib/fetch/src/lib.rs +++ b/lib/fetch/src/lib.rs @@ -16,7 +16,7 @@ pub use client::Client; mod client; /// Deliver an activity to an inbox. -pub async fn deliver(key: &SigningKey, activity: Activity, inbox: &str) { +pub async fn deliver(key: &SigningKey, activity: &Activity, inbox: &str) { Client::new().deliver(key, &activity, inbox).await } @@ -55,6 +55,9 @@ pub enum FetchError { /// message produced by the JSON deserializer. #[display(fmt = "deserialization error: {}", self.0)] BadJson(String), + /// A JSON-LD document could not be deserialized because it does not conform to our expectations. + #[display(fmt = "parsing error: {}", self.0)] + BadObject(String), /// An error that occurred while generating a signature for a a request. #[display(fmt = "signing error: {}", self.0)] Sig(String), diff --git a/lib/fetch/src/object.rs b/lib/fetch/src/object.rs index 7b7898a..bfed81b 100644 --- a/lib/fetch/src/object.rs +++ b/lib/fetch/src/object.rs @@ -1,6 +1,7 @@ //! ActivityPub vocabulary as interpreted by ActivityPuppy. use serde_json::{json, Value}; +use derive_more::From; pub use crate::signatures::Key as PublicKey; @@ -11,11 +12,11 @@ pub struct Activity { pub kind: T, } -impl Activity -where - K: ToString, -{ - pub fn to_json_ld(&self) -> Value { +impl Activity { + pub fn to_json_ld(&self) -> Value + where + K: ToString, + { json!({ "@context": [ "https://www.w3.org/ns/activitystreams", @@ -28,6 +29,33 @@ where }) } } +impl Activity { + pub fn from_json(mut json: Value) -> Result { + let Some(map) = json.as_object() else { + do yeet "expected an object" + }; + let Some(id) = map.get("id").and_then(|s| s.as_str()).map(str::to_owned) else { + do yeet "missing `id` property" + }; + let Some(actor) = map.get("actor").and_then(|s| s.as_str()).map(str::to_owned) else { + do yeet format!("missing `actor` property for activity {id}") + }; + let Some(kind) = map.get("type").and_then(|s| s.as_str()).map(str::to_owned) else { + do yeet format!("missing `type` property for activity {id}") + }; + // TODO: make this behave gracefully when we only get an ID. + let Some(object) = json + .get_mut("object") + .map(Value::take) + .map(Object::from_json) + .transpose()? + .map(Box::new) + else { + do yeet format!("missing or invalid `object` property for activity {id}") + }; + Ok(Activity { id, actor, object, kind }) + } +} /// An actor is an entity capable of producing Takes. pub struct Actor { @@ -64,11 +92,48 @@ impl Actor { } }) } + pub fn from_json(json: Value) -> Result { + let Value::Object(map) = json else { + do yeet format!("expected json object") + }; + Ok(Actor { + id: map + .get("id") + .ok_or("id is required")? + .as_str() + .ok_or("id must be a str")? + .to_string(), + inbox: map + .get("inbox") + .ok_or("inbox is required")? + .as_str() + .ok_or("inbox must be a str")? + .to_string(), + account_name: map + .get("preferredUsername") + .ok_or("preferredUsername is required")? + .as_str() + .ok_or("preferredUsername must be a str")? + .to_string(), + display_name: map.get("name").and_then(|v| v.as_str()).map(str::to_owned), + public_key: map + .get("publicKey") + .cloned() + .and_then(PublicKey::from_json) + .ok_or("publicKey property could not be parsed")?, + }) + } } +#[derive(From)] pub enum Object { + #[from(ignore)] + Id { + id: String, + }, Activity(Activity), Actor(Actor), + #[from(ignore)] Other { id: String, kind: String, @@ -84,10 +149,52 @@ impl Object { Object::Activity(a) => &a.id, Object::Actor(a) => &a.id, Object::Other { id, .. } => id, + Object::Id { id } => id, + } + } + pub fn from_json(json: Value) -> Result { + if let Value::String(id) = json { + Ok(Object::Id { id }) + } else if let Value::Object(ref map) = json { + match map.get("type").and_then(Value::as_str) { + Some("System" | "Application" | "Person" | "Service") => { + Actor::from_json(json).map(Object::Actor) + } + Some("Create" | "Follow" | "Accept" | "Reject" | "Bite") => { + Activity::from_json(json).map(Object::Activity) + } + Some(kind) => Ok(Object::Other { + id: map + .get("id") + .ok_or("id is required")? + .as_str() + .ok_or("id must be a str")? + .to_string(), + kind: kind.to_string(), + author: map + .get("attributedTo") + .ok_or("attributedTo is required")? + .as_str() + .ok_or("attributedTo must be a str")? + .to_string(), + content: map + .get("content") + .and_then(|v| v.as_str()) + .map(str::to_owned), + summary: map + .get("summary") + .and_then(|v| v.as_str()) + .map(str::to_owned), + }), + None => do yeet "could not determine type of object", + } + } else { + Err(format!("expected a json object or an id, got {json:#?}")) } } pub fn to_json_ld(&self) -> Value { match self { + Object::Id { id } => json!(id), Object::Activity(a) => a.to_json_ld(), Object::Actor(a) => a.to_json_ld(), Object::Other { diff --git a/lib/fetch/src/signatures.rs b/lib/fetch/src/signatures.rs index 32b907d..d00d594 100644 --- a/lib/fetch/src/signatures.rs +++ b/lib/fetch/src/signatures.rs @@ -649,8 +649,6 @@ mod new { .filter_map(|(k, v)| v.strip_prefix('"')?.strip_suffix('"').map(|v| (k, v))) .collect(); - let table = dbg!(table); - let Some(headers) = get(&table, "headers") else { do yeet "Missing `headers` field"; }; diff --git a/lib/puppy/src/context.rs b/lib/puppy/src/context.rs index 9fe481a..67956e7 100644 --- a/lib/puppy/src/context.rs +++ b/lib/puppy/src/context.rs @@ -1,3 +1,4 @@ +use fetch::Client; use store::{Key, Store, Transaction}; use crate::{config::Config, Result}; @@ -8,17 +9,16 @@ use crate::{config::Config, Result}; #[derive(Clone)] pub struct Context { config: Config, + client: Client, store: Store, } impl Context { - fn new(config: Config, store: Store) -> Context { - Context { config, store } - } /// Load the server context from the configuration. pub fn load(config: Config) -> Result { let store = Store::open(&config.state_dir, crate::data::schema())?; - Ok(Context { config, store }) + let client = Client::new(); + Ok(Context { config, store, client }) } /// Do a data store [transaction][store::Transaction]. pub fn run(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result { @@ -36,6 +36,10 @@ impl Context { pub fn mk_url(&self, key: Key) -> String { format!("https://{}/o/{key}", self.config.ap_domain) } + /// Access the federation client. + pub fn resolver(&self) -> &Client { + &self.client + } } /// Load a context for running tests in. @@ -45,5 +49,6 @@ pub fn test_context( schema: store::types::Schema, test: impl FnOnce(Context) -> Result, ) -> Result { - Store::test(schema, |store| test(Context { config, store })) + let client = Client::new(); + Store::test(schema, |store| test(Context { config, store, client })) } diff --git a/lib/puppy/src/interact.rs b/lib/puppy/src/interact.rs index 133bafd..0cac04e 100644 --- a/lib/puppy/src/interact.rs +++ b/lib/puppy/src/interact.rs @@ -11,7 +11,7 @@ use crate::{ /// Interactions with other objects. impl Actor { /// Create a [`Bite`]. - pub fn bite(&self, victim: &Actor) -> Bite { + pub fn bite(&self, victim: Actor) -> Bite { Bite { victim: victim.key, biter: self.key, @@ -19,7 +19,7 @@ impl Actor { } } /// Construct a [`FollowRequest`]. - pub fn follow_request(&self, target: &Actor) -> FollowRequest { + pub fn follow_request(&self, target: Actor) -> FollowRequest { FollowRequest { origin: self.key, target: target.key, @@ -27,13 +27,13 @@ impl Actor { } } /// Makes `biter` bite `victim` and inserts the records into the database. - pub fn do_bite(&self, cx: &Context, victim: &Actor) -> Result { + pub fn do_bite(&self, cx: &Context, victim: Actor) -> Result { let bite = self.bite(victim); - cx.run(|tx| try { tx.create(bite) })?; + cx.run(|tx| tx.create(bite).map_err(Error::Store))?; Ok(bite) } /// Creates a follow request from `self` to `target`. - pub fn do_follow_request(&self, cx: &Context, target: &Actor) -> Result { + pub fn do_follow_request(&self, cx: &Context, target: Actor) -> Result { let req = self.follow_request(target); cx.run(|tx| { tx.create(req)?; @@ -70,7 +70,7 @@ impl Actor { self.key == req.target, "only the target of a follow request may accept it" }; - cx.run(|tx| try { tx.update(req.id, |_| Status::Rejected) })?; + cx.run(|tx| try { tx.update(req.id, |_| Status::Rejected)? })?; Ok(()) } /// Get all pending follow request for `self`. @@ -156,7 +156,7 @@ mod tests { fn create_fr() -> Result<()> { test_context(test_config(), schema(), |cx| { let (alice, bob) = make_test_actors(&cx)?; - alice.do_follow_request(&cx, &bob)?; + alice.do_follow_request(&cx, bob)?; assert!( cx.store().exists::(alice.key, bob.key)?, "(alice -> bob) ∈ follow-requested" @@ -180,7 +180,7 @@ mod tests { test_context(test_config(), schema(), |cx| { let db = cx.store(); let (alice, bob) = make_test_actors(&cx)?; - let req = alice.do_follow_request(&cx, &bob)?; + let req = alice.do_follow_request(&cx, bob)?; bob.do_accept_request(&cx, req)?; assert!( @@ -210,7 +210,7 @@ mod tests { fn listing_follow_relations() -> Result<()> { test_context(test_config(), schema(), |cx| try { let (alice, bob) = make_test_actors(&cx)?; - let req = alice.do_follow_request(&cx, &bob)?; + let req = alice.do_follow_request(&cx, bob)?; bob.do_accept_request(&cx, req)?; cx.run(|tx| try { diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index 90dc137..559ca7f 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -12,14 +12,16 @@ // but that would make every type signature ever 100x more complicated, so we're not doing it. #![deny(clippy::disallowed_methods, clippy::disallowed_types)] +use std::hint::unreachable_unchecked; + +use actor::{get_signing_key, Actor}; pub use context::Context; #[cfg(test)] pub use context::test_context; -use data::{ - ActivityKind, AuthorOf, Channel, Content, Create, Id, Object, ObjectKind, Profile, PublicKey, -}; +use data::{ActivityKind, AuthorOf, Channel, Content, Create, Id, ObjectKind, Profile, PublicKey}; +use fetch::object::{Activity, Object}; use store::Transaction; pub use store::{self, Key, StoreError}; pub use fetch::{self, FetchError}; @@ -30,11 +32,13 @@ pub mod data; pub mod post; mod interact; +use derive_more::{From, Display}; + /// Retrieve an ActivityPub object from the database. /// /// Fails with `Error::Missing` if the required properties are not present. pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result { - let Some(obj) = tx.get_mixin::(key)? else { + let Some(obj) = tx.get_mixin::(key)? else { // We need this data in order to determine the object type. If the passed key does not // have this data, it must not be an ActivityPub object. return Err(Error::MissingData { node: key, prop: "Object" }); @@ -97,7 +101,10 @@ pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result Result { + let key = Key::gen(); + cx.run(|tx| { + tx.add_alias(key, Id(object.id.clone()))?; + tx.add_mixin(key, Channel { inbox: object.inbox })?; + tx.add_mixin(key, Object { + kind: ObjectKind::Actor, + id: Id(object.id), + local: false, + })?; + tx.add_mixin(key, Profile { + post_count: 0, + account_name: Username(object.account_name), + display_name: object.display_name, + about_string: None, + about_fields: Vec::new(), + })?; + tx.add_mixin(key, PublicKey { + key_id: object.public_key.id, + key_pem: object.public_key.inner, + })?; + Ok(Actor { key }) + }) + } + + /// Add properties related to local ActivityPub actors to a vertex. pub fn mixin_ap_actor( tx: &Transaction<'_>, vertex: Key, @@ -189,13 +221,16 @@ pub mod actor { pub type Result = std::result::Result; -#[derive(derive_more::From, Debug)] +#[derive(From, Debug, Display)] pub enum Error { /// An error internal to the store. + #[display(fmt = "{}", self.0)] Store(StoreError), /// An error generated by the [fetch] subsystem. + #[display(fmt = "{}", self.0)] Fetch(FetchError), /// Expected `node` to have some property that it doesn't have. + #[display(fmt = "missing data: {node} is missing {prop}")] MissingData { /// The node that is missing the data. node: Key, @@ -213,3 +248,99 @@ pub mod config { pub port: u16, } } + +/// Interpret an *incoming* activity. Outgoing activities are *never* interpreted through this function, +/// because their changes are already in the database. +// TODO: figure out if that is the behavior we actually want +pub fn interpret(cx: &Context, activity: Activity) -> Result<()> { + // Fetch our actor from the database + let Some(actor) = cx.store().lookup(Id(activity.actor.clone()))? else { + panic!( + "actor {} does not exist in the database (id={})", + activity.actor, activity.id + ) + }; + + // Fetch our object from the database. The object must already exist in the database. + let id = activity.object.id(); + let Some(object) = cx.store().lookup(Id(id.to_owned()))? else { + panic!( + "object {} does not exist in the database (id={})", + activity.object.id(), + activity.id + ) + }; + + let actor = actor::Actor { key: actor }; + let (key, tag) = match activity.kind.as_str() { + "Bite" => { + let object = actor::Actor { key: object }; + (actor.do_bite(&cx, object)?.id, ActivityKind::Bite) + } + "Create" => { + todo!() + } + "Follow" => { + let object = actor::Actor { key: object }; + + let req = actor.do_follow_request(&cx, object)?; + (req.id, ActivityKind::Follow) + } + tag @ ("Accept" | "Reject") => { + // Follow requests are multi-arrows in our graph, and they have their own activitypub id. + let Some(req) = cx.store().get_arrow(object)? else { + panic!( + "follow request does not exist: {object} (id={})", + activity.id + ) + }; + // Dispatch to the actual method based on the tag + let tag = match tag { + "Accept" => actor + .do_accept_request(&cx, req) + .map(|_| ActivityKind::Accept)?, + "Reject" => actor + .do_reject_request(&cx, req) + .map(|_| ActivityKind::Reject)?, + _ => unsafe { + // SAFETY: this branch of the outer match only matches if the tag is either "Accept" or "Reject", + // so this inner branch is truly unreachable. + unreachable_unchecked() + }, + }; + (Key::gen(), tag) + } + k => panic!("unsupported activity type {k} (id={})", activity.id), + }; + cx.run(|tx| { + tx.add_alias(key, Id(activity.id.clone()))?; + tx.add_mixin(key, data::Object { + id: Id(activity.id.clone()), + kind: ObjectKind::Activity(tag), + local: false, + })?; + Ok(()) + }) +} + +/// Make sure all the interesting bits of an activity are here. +pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()> { + let key = cx.run(|tx| get_signing_key(tx, actor::Actor { key: auth }).map_err(Error::Store))?; + for id in [activity.actor.as_str(), activity.object.id()] { + if cx.store().lookup(Id(id.to_owned()))?.is_some() { + // Skip ingesting if we already know this ID. + continue; + } + let json = cx.resolver().resolve(&key, &id).await?; + let object = Object::from_json(json).unwrap(); + match object { + Object::Activity(a) => interpret(&cx, a)?, + Object::Actor(a) => actor::create_remote(cx, a).map(void)?, + _ => todo!(), + } + } + Ok(()) +} + +/// Discard the argument. +fn void(_: T) -> () {} diff --git a/lib/puppy/src/post.rs b/lib/puppy/src/post.rs index a52b5a2..403dae1 100644 --- a/lib/puppy/src/post.rs +++ b/lib/puppy/src/post.rs @@ -13,8 +13,8 @@ use store::{util::IterExt as _, Key, Store, StoreError, Transaction}; use crate::{ actor::{get_signing_key, Actor}, data::{ - self, ActivityKind, AuthorOf, Content, Create, Id, ObjectKind, PrivateKey, Profile, - PublicKey, + self, ActivityKind, AuthorOf, Channel, Content, Create, Follows, Id, ObjectKind, + PrivateKey, Profile, PublicKey, }, Context, Error, }; @@ -153,13 +153,14 @@ pub fn create_post(cx: &Context, author: Key, content: impl Into) -> cr pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> { // Obtain all the data we need to construct our activity - let (Content { content, warning }, url, author, signing_key) = cx.run(|tx| try { + let (Content { content, warning }, url, author, signing_key, followers) = cx.run(|tx| try { let Some(AuthorOf { author, .. }) = tx.incoming(post.key).next().transpose()? else { - panic!() + panic!("can't federate post without author: {post:?}") }; let signing_key = get_signing_key(tx, Actor { key: author })?; let (c, data::Object { id, .. }) = tx.get_mixin_many(post.key)?; - (c, id, author, signing_key) + let targets = tx.join_on::(|a| a.follower, tx.incoming::(author))?; + (c, id, author, signing_key, targets) })?; let activity_key = Key::gen(); @@ -194,12 +195,15 @@ pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> { kind: "Create".to_string(), }; - fetch::deliver( - &signing_key, - activity, - "https://crimew.gay/users/ezri/inbox", - ) - .await; + for inbox in followers + .into_iter() + .filter_map(|(_, c)| c.map(|t| t.inbox)) + // FIXME: remove this when im done testing + .chain(["https://crimew.gay/users/riley/inbox".to_string()]) + { + fetch::deliver(&signing_key, &activity, &inbox).await; + } + Ok(()) } diff --git a/lib/store/src/lib.rs b/lib/store/src/lib.rs index 910323a..786c57f 100644 --- a/lib/store/src/lib.rs +++ b/lib/store/src/lib.rs @@ -15,7 +15,7 @@ use std::{cell::RefCell, path::Path, sync::Arc}; -use derive_more::From; +use derive_more::{From, Display}; use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction}; use types::Schema; @@ -139,7 +139,7 @@ pub const OK: Result<()> = Ok(()); pub type Result = std::result::Result; /// Errors from the data store. -#[derive(From, Debug)] +#[derive(From, Display, Debug)] pub enum StoreError { /// The requested value was expected to exist in a particular keyspace, but does not actually /// exist there. This can occur on updates for example.