From 288c181cc9f21df78495620f7533d01b658bda81 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Fri, 3 May 2024 18:35:05 +0200 Subject: [PATCH] make post federation work --- Cargo.lock | 1 + bin/pupctl/Cargo.toml | 1 + bin/pupctl/src/main.rs | 125 +++++++++++++++++------------------- bin/server/src/api.rs | 1 + bin/server/src/api/ap.rs | 23 +++++++ lib/fetch/src/client.rs | 24 ++++++- lib/fetch/src/object.rs | 13 +++- lib/fetch/src/signatures.rs | 2 +- lib/puppy/src/lib.rs | 24 ++++--- lib/puppy/src/post.rs | 71 ++++++++++++++++++-- 10 files changed, 202 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 802d527..c8e842f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,6 +1236,7 @@ dependencies = [ "clap", "cli-table", "puppy", + "tokio", ] [[package]] diff --git a/bin/pupctl/Cargo.toml b/bin/pupctl/Cargo.toml index 9819730..b9c5591 100644 --- a/bin/pupctl/Cargo.toml +++ b/bin/pupctl/Cargo.toml @@ -6,3 +6,4 @@ edition = "2021" puppy = { path = "../../lib/puppy" } clap = { version = "*", features = ["derive"] } cli-table = "*" +tokio = { version = "*", features = ["full"] } diff --git a/bin/pupctl/src/main.rs b/bin/pupctl/src/main.rs index 7ba95ba..9813536 100644 --- a/bin/pupctl/src/main.rs +++ b/bin/pupctl/src/main.rs @@ -1,16 +1,10 @@ //! Control program for the ActivityPub federated social media server. #![feature(iterator_try_collect)] -use puppy::{ - actor::Actor, - config::Config, - data::{Bite, Profile}, - post::Author, - store::util::IterExt as _, - Context, -}; +use puppy::{actor::Actor, config::Config, Context}; -fn main() -> puppy::Result<()> { +#[tokio::main] +async fn main() -> puppy::Result<()> { // puppy::store::Store::nuke(".state")?; let config = Config { ap_domain: "test.piss-on.me".to_string(), @@ -19,68 +13,69 @@ fn main() -> puppy::Result<()> { port: 1312, }; let cx = Context::load(config)?; - let db = cx.store(); - println!("creating actors"); let riley = get_or_create_actor(&cx, "riley")?; - let linen = get_or_create_actor(&cx, "linen")?; - if true { - println!("creating posts"); - puppy::post::create_post(&cx, riley.key, "@linen <3")?; - puppy::post::create_post(&cx, linen.key, "@riley <3")?; - } + let post = puppy::post::create_post(&cx, riley.key, "i like boys")?; + puppy::post::federate_post(&cx, post).await - if true { - println!("making riley follow linen"); - cx.run(|tx| { - if !riley.follows(&tx, &linen)? { - println!("follow relation does not exist yet"); - if let Some(req) = linen - .pending_requests(&tx) - .find_ok(|r| r.origin == riley.key)? - { - println!("accepting the pending follow request"); - linen.do_accept_request(&cx, req) - } else { - println!("no pending follow request; creating"); - riley.do_follow_request(&cx, &linen).map(|_| ()) - } - } else { - println!("riley already follows linen"); - Ok(()) - } - })?; - } + // let linen = get_or_create_actor(&cx, "linen")?; + // if true { + // println!("creating posts"); + // puppy::post::create_post(&cx, riley.key, "@linen <3")?; + // puppy::post::create_post(&cx, linen.key, "@riley <3")?; + // } - println!("\nPosts on the instance:"); - for post in puppy::post::fetch_timeline(&db, .., None)?.posts() { - let Author { ref handle, .. } = post.author; - let content = post.content.content.as_ref().unwrap(); - println!("- {:?} by {handle}:\n{content}", post.id) - } + // if true { + // println!("making riley follow linen"); + // cx.run(|tx| { + // if !riley.follows(&tx, &linen)? { + // println!("follow relation does not exist yet"); + // if let Some(req) = linen + // .pending_requests(&tx) + // .find_ok(|r| r.origin == riley.key)? + // { + // println!("accepting the pending follow request"); + // linen.do_accept_request(&cx, req) + // } else { + // println!("no pending follow request; creating"); + // riley.do_follow_request(&cx, &linen).map(|_| ()) + // } + // } else { + // println!("riley already follows linen"); + // Ok(()) + // } + // })?; + // } - cx.run(|tx| { - println!("\nLinen's followers:"); - for id in linen.followers(&tx).try_collect::>()? { - let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); - println!("- @{account_name} ({id})"); - } + // println!("\nPosts on the instance:"); + // for post in puppy::post::fetch_timeline(&db, .., None)?.posts() { + // let Author { ref handle, .. } = post.author; + // let content = post.content.content.as_ref().unwrap(); + // println!("- {:?} by {handle}:\n{content}", post.id) + // } - println!("\nRiley's following:"); - for id in riley.following(&tx).try_collect::>()? { - let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); - println!("- @{account_name} ({id})"); - } + // cx.run(|tx| { + // println!("\nLinen's followers:"); + // for id in linen.followers(&tx).try_collect::>()? { + // let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); + // println!("- @{account_name} ({id})"); + // } - if false { - println!("Biting riley"); - linen.do_bite(&cx, &riley)?; - for Bite { id, biter, .. } in riley.bites_suffered(&tx).try_collect::>()? { - let Profile { account_name, .. } = db.get_mixin(biter)?.unwrap(); - println!("riley was bitten by @{account_name} at {}", id.timestamp()); - } - } - Ok(()) - }) + // println!("\nRiley's following:"); + // for id in riley.following(&tx).try_collect::>()? { + // let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); + // println!("- @{account_name} ({id})"); + // } + + // if false { + // println!("Biting riley"); + // linen.do_bite(&cx, &riley)?; + // for Bite { id, biter, .. } in riley.bites_suffered(&tx).try_collect::>()? { + // let Profile { account_name, .. } = db.get_mixin(biter)?.unwrap(); + // println!("riley was bitten by @{account_name} at {}", id.timestamp()); + // } + // } + // Ok(()) + // }) } fn get_or_create_actor(cx: &Context, username: &str) -> puppy::Result { diff --git a/bin/server/src/api.rs b/bin/server/src/api.rs index 3a9a898..744e8a0 100644 --- a/bin/server/src/api.rs +++ b/bin/server/src/api.rs @@ -228,6 +228,7 @@ async fn dispatch_public( ) -> Result { match (req.method, req.path()) { (GET, ["proxy"]) => ap::proxy(&cx, &req.params).await, + (GET, ["outbox"]) => ap::outbox(&cx, &req.params).await, (GET, [".well-known", "webfinger"]) => wf::resolve(&cx, &req.params), // TODO: nicer solution for this (GET, VERIFIER_MOUNT) => Ok(ap::serve_verifier_actor(&verifier)), diff --git a/bin/server/src/api/ap.rs b/bin/server/src/api/ap.rs index 913c43b..9575fe3 100644 --- a/bin/server/src/api/ap.rs +++ b/bin/server/src/api/ap.rs @@ -38,6 +38,29 @@ pub async fn proxy(cx: &Context, params: &[(&str, &str)]) -> Result Result { + // Extract our query parameters. + let Some(user) = params.iter().find_map(|(k, v)| (*k == "user").then_some(v)) else { + fuck!(400: "expected `user` query param"); + }; + let Some(content) = params + .iter() + .find_map(|(k, v)| (*k == "content").then_some(v)) + else { + fuck!(400: "expected `url` query param"); + }; + + let Ok(Some(actor)) = cx.run(|tx| Actor::by_username(&tx, user)) else { + fuck!(500: "failed actor by name {user}"); + }; + + let post = puppy::post::create_post(&cx, actor.key, content.to_string()).unwrap(); + puppy::post::federate_post(&cx, post).await.unwrap(); + Ok(respond! { + code: 200 + }) +} + /// Handle POSTs to actor inboxes. Requires request signature. pub fn inbox(cx: &Context, actor_id: &str, sig: Signer, body: Value) -> Response { todo!() diff --git a/lib/fetch/src/client.rs b/lib/fetch/src/client.rs index aef41c6..e209e1d 100644 --- a/lib/fetch/src/client.rs +++ b/lib/fetch/src/client.rs @@ -1,4 +1,5 @@ use chrono::Utc; +use http::Method; use http_body_util::BodyExt as _; use reqwest::Body; use serde_json::Value; @@ -36,7 +37,28 @@ impl Client { /// Note that in order for the request to be considered valid by most implementations, `key.owner` /// must equal `payload.actor`. pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: &str) { - todo!() + let system = Subsystem::Delivery; + + let body = serde_json::to_string(&payload.to_json_ld()).unwrap(); + let mut req = system + .new_request(inbox) + .unwrap() + .method(Method::POST) + .header("content-type", ACTIVITYPUB_TYPE) + .body(body) + .unwrap(); + + key.sign_with_digest(Options::LEGACY, &mut req) + .map_err(FetchError::Sig) + .expect("signature generation to work") + .commit(&mut req); + + let req = dbg!(req); + + let request = req.map(Body::from).try_into().unwrap(); + let response = self.inner.execute(request).await.unwrap(); + let body = dbg!(response).text().await.unwrap(); + dbg!(body); } /// A high-level function to resolve a single ActivityPub ID using a signed request. pub async fn resolve(&self, key: &SigningKey, url: &str) -> Result { diff --git a/lib/fetch/src/object.rs b/lib/fetch/src/object.rs index caffc69..7b7898a 100644 --- a/lib/fetch/src/object.rs +++ b/lib/fetch/src/object.rs @@ -72,6 +72,7 @@ pub enum Object { Other { id: String, kind: String, + author: String, content: Option, summary: Option, }, @@ -89,9 +90,19 @@ impl Object { match self { Object::Activity(a) => a.to_json_ld(), Object::Actor(a) => a.to_json_ld(), - Object::Other { id, kind, content, summary } => json!({ + Object::Other { + id, + kind, + content, + summary, + author, + } => json!({ + "to": [ + "https://www.w3.org/ns/activitystreams#Public", + ], "id": id.to_string(), "type": kind, + "attributedTo": author, "content": content, "summary": summary, }), diff --git a/lib/fetch/src/signatures.rs b/lib/fetch/src/signatures.rs index 8f659f0..32b907d 100644 --- a/lib/fetch/src/signatures.rs +++ b/lib/fetch/src/signatures.rs @@ -184,7 +184,7 @@ impl SigningKey { T: AsRef<[u8]>, { // Calculate and insert digest if it isn't there yet, otherwise do nothing. - let digest = encode(sha256(req.body())); + let digest = format!("sha-256={}", encode(sha256(req.body()))); req.headers_mut() .entry("digest") .or_insert_with(|| digest.try_into().unwrap()); diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index fc5a12d..90dc137 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -16,11 +16,13 @@ pub use context::Context; #[cfg(test)] pub use context::test_context; -use data::{ActivityKind, Channel, Content, Create, Id, Object, ObjectKind, Profile, PublicKey}; +use data::{ + ActivityKind, AuthorOf, Channel, Content, Create, Id, Object, ObjectKind, Profile, PublicKey, +}; use store::Transaction; pub use store::{self, Key, StoreError}; -pub use fetch; +pub use fetch::{self, FetchError}; mod context; pub mod data; @@ -76,10 +78,17 @@ pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result bool { - self.local + Ok(maybe_key.map(|key| Actor { key })) } } @@ -136,7 +140,7 @@ pub mod actor { about_string: None, about_fields: Vec::new(), })?; - Ok(Actor { key, local: true }) + Ok(Actor { key }) }) } @@ -189,6 +193,8 @@ pub type Result = std::result::Result; pub enum Error { /// An error internal to the store. Store(StoreError), + /// An error generated by the [fetch] subsystem. + Fetch(FetchError), /// Expected `node` to have some property that it doesn't have. MissingData { /// The node that is missing the data. diff --git a/lib/puppy/src/post.rs b/lib/puppy/src/post.rs index 006a827..a52b5a2 100644 --- a/lib/puppy/src/post.rs +++ b/lib/puppy/src/post.rs @@ -4,15 +4,22 @@ use std::ops::RangeBounds; use chrono::{DateTime, Utc}; use either::Either::{Left, Right}; -use fetch::signatures::Private; +use fetch::{ + object::{Activity, Object}, + signatures::Private, +}; use store::{util::IterExt as _, Key, Store, StoreError, Transaction}; use crate::{ - data::{self, AuthorOf, Content, Id, Object, ObjectKind, PrivateKey, Profile, PublicKey}, - Context, + actor::{get_signing_key, Actor}, + data::{ + self, ActivityKind, AuthorOf, Content, Create, Id, ObjectKind, PrivateKey, Profile, + PublicKey, + }, + Context, Error, }; -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct Post { pub key: Key, } @@ -125,7 +132,7 @@ pub fn fetch_timeline( Ok(Timeline { items: posts }) } -/// Create a new post. +/// Create a new post entity. pub fn create_post(cx: &Context, author: Key, content: impl Into) -> crate::Result { let content = content.into(); cx.run(|tx| { @@ -135,7 +142,7 @@ pub fn create_post(cx: &Context, author: Key, content: impl Into) -> cr // Federation stuff let id = Id(cx.mk_url(key)); tx.add_alias(key, id.clone())?; - tx.add_mixin(key, Object { + tx.add_mixin(key, data::Object { kind: ObjectKind::Notelike("Note".to_string()), local: true, id, @@ -144,6 +151,58 @@ pub fn create_post(cx: &Context, author: Key, content: impl Into) -> cr }) } +pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> { + // Obtain all the data we need to construct our activity + let (Content { content, warning }, url, author, signing_key) = cx.run(|tx| try { + let Some(AuthorOf { author, .. }) = tx.incoming(post.key).next().transpose()? else { + panic!() + }; + let signing_key = get_signing_key(tx, Actor { key: author })?; + let (c, data::Object { id, .. }) = tx.get_mixin_many(post.key)?; + (c, id, author, signing_key) + })?; + + let activity_key = Key::gen(); + + // Insert a create activity into the database so we can serve it later + cx.run(|tx| try { + let id = Id(cx.mk_url(activity_key)); + tx.add_alias(activity_key, id.clone())?; + tx.add_mixin(activity_key, data::Object { + kind: ObjectKind::Activity(ActivityKind::Create), + local: true, + id, + })?; + tx.create(Create { + id: activity_key, + actor: author, + object: post.key, + })?; + })?; + + // Construct an ActivityPub message to send + let activity = Activity { + id: cx.mk_url(activity_key), + actor: signing_key.owner.clone(), + object: Box::new(Object::Other { + id: url.to_string(), + kind: "Note".to_string(), + author: cx.mk_url(author), + summary: warning, + content, + }), + kind: "Create".to_string(), + }; + + fetch::deliver( + &signing_key, + activity, + "https://crimew.gay/users/ezri/inbox", + ) + .await; + Ok(()) +} + /// Add a post's mixins and predicates to an existing `node`. pub fn mixin_post( tx: &Transaction<'_>,