From 9c6e7b1471e7bc2afc488ef31de16d88e52d2330 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Mon, 6 May 2024 00:07:28 +0200 Subject: [PATCH] Hook up the restructured processor to the inbox --- bin/server/src/api.rs | 4 +- bin/server/src/api/ap.rs | 40 +++++++++++++-- bin/server/src/main.rs | 2 +- lib/puppy/src/data.rs | 9 ++++ lib/puppy/src/lib.rs | 105 +++++++++++++++++++++++---------------- 5 files changed, 109 insertions(+), 51 deletions(-) diff --git a/bin/server/src/api.rs b/bin/server/src/api.rs index cde23bd..2f0f60f 100644 --- a/bin/server/src/api.rs +++ b/bin/server/src/api.rs @@ -214,7 +214,7 @@ const GET: &Method = &Method::GET; /// This function is where all requests to a protected endpoint have to go through. If the request /// was signed but does not target a protected endpoint, this function will fall back to the /// [`dispatch_public`] handler. -#[tracing::instrument(level = "DEBUG", target = "router", skip_all)] +// #[tracing::instrument(level = "DEBUG", target = "router", skip_all)] async fn dispatch_signed( cx: Context, verifier: &Verifier, @@ -236,7 +236,7 @@ async fn dispatch_signed( /// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the /// function will return a 404 response. If the path *does* exist, but the signature is not /// valid, they will also get a 404. -#[tracing::instrument(level = "DEBUG", target = "router", skip_all)] +// #[tracing::instrument(level = "DEBUG", target = "router", skip_all)] async fn dispatch_public( cx: Context, verifier: &Verifier, diff --git a/bin/server/src/api/ap.rs b/bin/server/src/api/ap.rs index d53c5e3..9e9ab5e 100644 --- a/bin/server/src/api/ap.rs +++ b/bin/server/src/api/ap.rs @@ -1,11 +1,15 @@ //! ActivityPub handlers. +use std::sync::{Mutex, MutexGuard}; + use http_body_util::Full; use hyper::body::Bytes; use puppy::{ actor::{get_signing_key, Actor}, - fetch::object::Activity, - get_local_ap_object, Context, Error, Key, + fetch::{object::Activity, Client}, + get_local_ap_object, + store::Transaction, + Context, Error, Key, }; use serde_json::Value; use tracing::{info, instrument}; @@ -81,10 +85,38 @@ pub async fn inbox( "processing object '{id}'", }; + let tx = cx.store().start(); + let key = get_signing_key(&tx, Actor { key: receiver }).unwrap(); + + struct InboxContext<'c> { + cx: &'c Context, + tx: Mutex>, + } + + impl<'a> puppy::systems::Context<'a> for InboxContext<'a> { + /// Access the transaction. + fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>> + where + 'a: 'y, + 'y: 'a, + { + self.tx.lock().unwrap() + } + /// Access the federation client. + fn client(&self) -> &Client { + self.cx.resolver() + } + /// Get the ActivityPub domain of this server. + fn domain(&self) -> &str { + &self.cx.config().ap_domain + } + } + match Activity::from_json(json) { Ok(activity) => { - puppy::ingest(&cx, receiver, &activity).await.unwrap(); - match puppy::interpret(&cx, activity) { + let context = InboxContext { tx: Mutex::new(tx), cx }; + let r = puppy::systems::processor::process_incoming(&context, &key, activity).await; + match r { Ok(_) => Ok(respond!(code: 202)), Err(err) => fuck!(400: "error interpreting activity: {err}"), } diff --git a/bin/server/src/main.rs b/bin/server/src/main.rs index df1f813..9edf463 100644 --- a/bin/server/src/main.rs +++ b/bin/server/src/main.rs @@ -23,7 +23,7 @@ mod api; async fn main() { Registry::default() .with(filter_fn(|meta| !meta.target().starts_with("reqwest"))) - .with(filter_fn(|meta| *meta.level() < Level::DEBUG)) + //.with(filter_fn(|meta| *meta.level() < Level::DEBUG)) .with(ForestLayer::default()) .init(); // TODO: load the config from a file or something. diff --git a/lib/puppy/src/data.rs b/lib/puppy/src/data.rs index e872282..7ccaf28 100644 --- a/lib/puppy/src/data.rs +++ b/lib/puppy/src/data.rs @@ -90,6 +90,14 @@ pub struct FollowRequest { pub target: Key, } +#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] +pub struct Accept { + #[identity] + pub id: Key, + pub origin: Key, + pub target: Key, +} + /// The status of a [`FollowRequest`]. /// /// Valid state transitions: @@ -212,4 +220,5 @@ pub fn schema() -> Schema { .has::() .has::() .has::() + .has::() } diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index e2c7852..1eb0806 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -355,14 +355,19 @@ fn void(_: T) -> () {} pub mod systems { //! Logic containment zone. + use std::sync::MutexGuard; + use fetch::Client; use store::{Key, Transaction}; use tracing::warn; /// Allows subsystems to interact with the [fetch] and [store] components. - pub trait Context { + pub trait Context<'a> { /// Access the transaction. - fn db(&self) -> &Transaction<'_>; + fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>> + where + 'a: 'y, + 'y: 'a; /// Access the federation client. fn client(&self) -> &Client; /// Get the ActivityPub domain of this server. @@ -411,9 +416,9 @@ pub mod systems { pub trait Notification {} /// Get the notification where it needs to go. - pub fn dispatch(cx: &C, event: &impl Notification) -> Result<()> + pub fn dispatch<'x, C>(cx: &C, event: &impl Notification) -> Result<()> where - C: Context, + C: Context<'x>, { info!("ding!"); Ok(()) @@ -450,25 +455,25 @@ pub mod systems { /// Implemented for types that can represent an activity that can be delivered to a remote server. pub trait Payload { /// Construct an [`Activity`] for delivery. - fn prepare(self, cx: &C) -> Result + fn prepare<'c, C>(self, cx: &C) -> Result where - C: Context + ?Sized; + C: Context<'c>; /// Call the delivery subsystem to this payload everywhere it needs to go. /// /// A convenience method to call [`deliver`] as a method instead of a free-standing function. - fn deliver(self, cx: &C) -> impl Future> + Send + Sync + fn deliver<'c, C>(self, cx: &C) -> impl Future> + Send + Sync where Self: Sized + Send + Sync, - C: Context + Send + Sync + ?Sized, + C: Context<'c> + Send + Sync, { deliver(cx, self) } } impl Payload for Accept { - fn prepare(self, cx: &C) -> Result + fn prepare<'c, C>(self, cx: &C) -> Result where - C: Context + ?Sized, + C: Context<'c>, { let (actor, object, tag) = get_activity_data(cx, self.into())?; Ok(Activity { @@ -481,9 +486,9 @@ pub mod systems { } /// Deliver the activity-like payload to all its intended recipients. - pub async fn deliver(cx: &C, payload: impl Payload) -> Result<()> + pub async fn deliver<'c, C>(cx: &C, payload: impl Payload) -> Result<()> where - C: Context + ?Sized, + C: Context<'c>, { let activity = payload.prepare(cx)?; @@ -504,9 +509,12 @@ pub mod systems { } /// Calculate the list of inboxes to send the activity to. - fn get_targets(cx: &C, activity: &Activity) -> Result> + fn get_targets<'c, C>( + cx: &C, + activity: &Activity, + ) -> Result> where - C: Context + ?Sized, + C: Context<'c>, { use crate::data::{Channel, Id}; @@ -531,9 +539,9 @@ pub mod systems { /// Get a signing key for the given actor id. /// /// Will fail if the actor isn't local. - fn get_keypair(cx: &C, actor_id: &str) -> Result + fn get_keypair<'c, C>(cx: &C, actor_id: &str) -> Result where - C: Context + ?Sized, + C: Context<'c>, { use crate::{ actor::{get_signing_key, Actor}, @@ -542,13 +550,13 @@ pub mod systems { let Some(actor_key) = cx.db().lookup(Id(actor_id.to_string()))? else { panic!("could not get db key for {actor_id}"); }; - get_signing_key(cx.db(), Actor { key: actor_key }).map_err(Error::Store) + get_signing_key(&*cx.db(), Actor { key: actor_key }).map_err(Error::Store) } /// Get actor id, the object id, and the type of the activity. - fn get_activity_data(cx: &C, key: Key) -> Result<(String, String, Tag)> + fn get_activity_data<'c, C>(cx: &C, key: Key) -> Result<(String, String, Tag)> where - C: Context + ?Sized, + C: Context<'c>, { // TODO: instead of panicking, return a normal error. use crate::data::{Id, Object, ActivityKind, ObjectKind}; @@ -617,13 +625,13 @@ pub mod systems { /// Process an incoming activity. #[tracing::instrument(target = "puppy.processor", skip_all, fields(activity = root.id))] - pub async fn process_incoming( + pub async fn process_incoming<'c, C>( cx: &C, on_behalf_of: &SigningKey, root: Activity, ) -> Result<(Key, Tag)> where - C: Context, + C: Context<'c>, { if cx.is_local(&root.id) || cx.is_known(&root.id) { panic!("could not process activity, it already exists"); @@ -670,12 +678,12 @@ pub mod systems { /// If this is not the case, it will panic. /// /// Specifically, - #[tracing::instrument(level = "TRACE", target = "puppy.processor", skip_all, fields(activity = activity.id))] - async fn apply_activity(cx: &C, activity: Activity) -> Result<(Key, Tag)> + #[tracing::instrument(level = "TRACE", target = "puppy.processor", skip_all, fields(activity = activity.id, tag = activity.kind))] + async fn apply_activity<'c, C>(cx: &C, activity: Activity) -> Result<(Key, Tag)> where - C: Context, + C: Context<'c>, { - use crate::data::{Id, Create}; + use crate::data::{Id, Create, Object, Accept}; // Get a key and error out if it does not exist let get_key = |url: &str| -> Key { cx.db() @@ -691,12 +699,12 @@ pub mod systems { let target = get_key(activity.object.id()); let req = following::create_follow_request(cx, requester, target)?; - debug!("created follow request {req}"); // For now, automatically accept follow requests. if cx.is_local(activity.object.id()) { debug!("auto-accepting follow request for local actor {target}",); let accept = following::accept_follow_request(cx, req)?; + // TODO: Add object components to accept activity before sending if !cx.is_local(&activity.actor) { debug!("delivering to remote actor"); delivery::deliver(cx, accept).await?; @@ -732,14 +740,14 @@ pub mod systems { /// /// The `budget` parameter specifies the maximum number of recursive calls. #[tracing::instrument(target = "puppy.processor", skip_all, fields(budget = budget, target = root.id))] - async fn fetch_dependencies( + async fn fetch_dependencies<'c, C>( cx: &C, root: &Activity, auth: &SigningKey, budget: usize, ) -> Result<(Vec, Vec, Vec)> where - C: Context, + C: Context<'c>, { let mut actors = Vec::new(); let mut notes = Vec::new(); @@ -789,24 +797,24 @@ pub mod systems { Ok((actors, notes, activities)) } - fn store_actor(cx: &C, actor: Actor) -> Result<()> + fn store_actor<'c, C>(cx: &C, actor: Actor) -> Result<()> where - C: Context, + C: Context<'c>, { if !cx.is_known(&actor.id) { - crate::actor::create_remote(cx.db(), actor)?; + crate::actor::create_remote(&*cx.db(), actor)?; } else { debug!("actor {} is already known", actor.id); } Ok(()) } - fn store_note(cx: &C, note: Note) -> Result<()> + fn store_note<'c, C>(cx: &C, note: Note) -> Result<()> where - C: Context, + C: Context<'c>, { if !cx.is_known(¬e.id) { - crate::post::create_post_from_note(cx.db(), note)?; + crate::post::create_post_from_note(&*cx.db(), note)?; } else { debug!("note {} is already known", note.id); } @@ -843,9 +851,13 @@ pub mod systems { }; /// Create a follow request. - pub fn create_follow_request(cx: &C, follower: Key, target: Key) -> Result + pub fn create_follow_request<'c, C>( + cx: &C, + follower: Key, + target: Key, + ) -> Result where - C: Context + ?Sized, + C: Context<'c>, { Ok(Key::gen().into()) } @@ -856,27 +868,27 @@ pub mod systems { /// the follow request is withdrawn. /// /// This creates a new [`Undo`] entry in the database to which data may be attached. - pub fn cancel_follow_request(cx: &C, req: FollowRequest) -> Result + pub fn cancel_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result where - C: Context + ?Sized, + C: Context<'c>, { Ok(Key::gen().into()) } /// Apply the changes related to accepting a follow request to the social graph and create a new node representing /// the event. - pub fn accept_follow_request(cx: &C, req: FollowRequest) -> Result + pub fn accept_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result where - C: Context + ?Sized, + C: Context<'c>, { Ok(Key::gen().into()) } /// Apply the changes related to rejecting a follow request to the social graph and create a new node representing /// the event. - pub fn reject_follow_request(cx: &C, req: FollowRequest) -> Result + pub fn reject_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result where - C: Context + ?Sized, + C: Context<'c>, { Ok(Key::gen().into()) } @@ -894,12 +906,17 @@ pub mod systems { /// Describes objects which have a "revert" operation defined (that is, they can be the target of an [`Undo`] activity). pub trait Reversible { /// Undo `self` and generate a corresponding [`Undo`] object recording this fact. - fn revert(&self, cx: &dyn Context) -> crate::Result; + fn revert<'c, C>(&self, cx: &C) -> crate::Result + where + C: Context<'c>; } impl Reversible for FollowRequest { /// Withdraw a follow request if it wasn't yet accepted, or unfollow someone. - fn revert(&self, cx: &dyn Context) -> crate::Result { + fn revert<'c, C>(&self, cx: &C) -> crate::Result + where + C: Context<'c>, + { following::cancel_follow_request(cx, *self) } }