Hook up the restructured processor to the inbox

This commit is contained in:
Riley Apeldoorn 2024-05-06 00:07:28 +02:00
parent 7641add472
commit 9c6e7b1471
5 changed files with 109 additions and 51 deletions

View file

@ -214,7 +214,7 @@ const GET: &Method = &Method::GET;
/// This function is where all requests to a protected endpoint have to go through. If the request /// This function is where all requests to a protected endpoint have to go through. If the request
/// was signed but does not target a protected endpoint, this function will fall back to the /// was signed but does not target a protected endpoint, this function will fall back to the
/// [`dispatch_public`] handler. /// [`dispatch_public`] handler.
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)] // #[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
async fn dispatch_signed( async fn dispatch_signed(
cx: Context, cx: Context,
verifier: &Verifier, verifier: &Verifier,
@ -236,7 +236,7 @@ async fn dispatch_signed(
/// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the /// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the
/// function will return a 404 response. If the path *does* exist, but the signature is not /// function will return a 404 response. If the path *does* exist, but the signature is not
/// valid, they will also get a 404. /// valid, they will also get a 404.
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)] // #[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
async fn dispatch_public( async fn dispatch_public(
cx: Context, cx: Context,
verifier: &Verifier, verifier: &Verifier,

View file

@ -1,11 +1,15 @@
//! ActivityPub handlers. //! ActivityPub handlers.
use std::sync::{Mutex, MutexGuard};
use http_body_util::Full; use http_body_util::Full;
use hyper::body::Bytes; use hyper::body::Bytes;
use puppy::{ use puppy::{
actor::{get_signing_key, Actor}, actor::{get_signing_key, Actor},
fetch::object::Activity, fetch::{object::Activity, Client},
get_local_ap_object, Context, Error, Key, get_local_ap_object,
store::Transaction,
Context, Error, Key,
}; };
use serde_json::Value; use serde_json::Value;
use tracing::{info, instrument}; use tracing::{info, instrument};
@ -81,10 +85,38 @@ pub async fn inbox(
"processing object '{id}'", "processing object '{id}'",
}; };
let tx = cx.store().start();
let key = get_signing_key(&tx, Actor { key: receiver }).unwrap();
struct InboxContext<'c> {
cx: &'c Context,
tx: Mutex<Transaction<'c>>,
}
impl<'a> puppy::systems::Context<'a> for InboxContext<'a> {
/// Access the transaction.
fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>>
where
'a: 'y,
'y: 'a,
{
self.tx.lock().unwrap()
}
/// Access the federation client.
fn client(&self) -> &Client {
self.cx.resolver()
}
/// Get the ActivityPub domain of this server.
fn domain(&self) -> &str {
&self.cx.config().ap_domain
}
}
match Activity::from_json(json) { match Activity::from_json(json) {
Ok(activity) => { Ok(activity) => {
puppy::ingest(&cx, receiver, &activity).await.unwrap(); let context = InboxContext { tx: Mutex::new(tx), cx };
match puppy::interpret(&cx, activity) { let r = puppy::systems::processor::process_incoming(&context, &key, activity).await;
match r {
Ok(_) => Ok(respond!(code: 202)), Ok(_) => Ok(respond!(code: 202)),
Err(err) => fuck!(400: "error interpreting activity: {err}"), Err(err) => fuck!(400: "error interpreting activity: {err}"),
} }

View file

@ -23,7 +23,7 @@ mod api;
async fn main() { async fn main() {
Registry::default() Registry::default()
.with(filter_fn(|meta| !meta.target().starts_with("reqwest"))) .with(filter_fn(|meta| !meta.target().starts_with("reqwest")))
.with(filter_fn(|meta| *meta.level() < Level::DEBUG)) //.with(filter_fn(|meta| *meta.level() < Level::DEBUG))
.with(ForestLayer::default()) .with(ForestLayer::default())
.init(); .init();
// TODO: load the config from a file or something. // TODO: load the config from a file or something.

View file

@ -90,6 +90,14 @@ pub struct FollowRequest {
pub target: Key, pub target: Key,
} }
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct Accept {
#[identity]
pub id: Key,
pub origin: Key,
pub target: Key,
}
/// The status of a [`FollowRequest`]. /// The status of a [`FollowRequest`].
/// ///
/// Valid state transitions: /// Valid state transitions:
@ -212,4 +220,5 @@ pub fn schema() -> Schema {
.has::<AuthorOf>() .has::<AuthorOf>()
.has::<Follows>() .has::<Follows>()
.has::<Create>() .has::<Create>()
.has::<Accept>()
} }

View file

@ -355,14 +355,19 @@ fn void<T>(_: T) -> () {}
pub mod systems { pub mod systems {
//! Logic containment zone. //! Logic containment zone.
use std::sync::MutexGuard;
use fetch::Client; use fetch::Client;
use store::{Key, Transaction}; use store::{Key, Transaction};
use tracing::warn; use tracing::warn;
/// Allows subsystems to interact with the [fetch] and [store] components. /// Allows subsystems to interact with the [fetch] and [store] components.
pub trait Context { pub trait Context<'a> {
/// Access the transaction. /// Access the transaction.
fn db(&self) -> &Transaction<'_>; fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>>
where
'a: 'y,
'y: 'a;
/// Access the federation client. /// Access the federation client.
fn client(&self) -> &Client; fn client(&self) -> &Client;
/// Get the ActivityPub domain of this server. /// Get the ActivityPub domain of this server.
@ -411,9 +416,9 @@ pub mod systems {
pub trait Notification {} pub trait Notification {}
/// Get the notification where it needs to go. /// Get the notification where it needs to go.
pub fn dispatch<C>(cx: &C, event: &impl Notification) -> Result<()> pub fn dispatch<'x, C>(cx: &C, event: &impl Notification) -> Result<()>
where where
C: Context, C: Context<'x>,
{ {
info!("ding!"); info!("ding!");
Ok(()) Ok(())
@ -450,25 +455,25 @@ pub mod systems {
/// Implemented for types that can represent an activity that can be delivered to a remote server. /// Implemented for types that can represent an activity that can be delivered to a remote server.
pub trait Payload { pub trait Payload {
/// Construct an [`Activity`] for delivery. /// Construct an [`Activity`] for delivery.
fn prepare<C>(self, cx: &C) -> Result<Activity> fn prepare<'c, C>(self, cx: &C) -> Result<Activity>
where where
C: Context + ?Sized; C: Context<'c>;
/// Call the delivery subsystem to this payload everywhere it needs to go. /// Call the delivery subsystem to this payload everywhere it needs to go.
/// ///
/// A convenience method to call [`deliver`] as a method instead of a free-standing function. /// A convenience method to call [`deliver`] as a method instead of a free-standing function.
fn deliver<C>(self, cx: &C) -> impl Future<Output = Result<()>> + Send + Sync fn deliver<'c, C>(self, cx: &C) -> impl Future<Output = Result<()>> + Send + Sync
where where
Self: Sized + Send + Sync, Self: Sized + Send + Sync,
C: Context + Send + Sync + ?Sized, C: Context<'c> + Send + Sync,
{ {
deliver(cx, self) deliver(cx, self)
} }
} }
impl Payload for Accept { impl Payload for Accept {
fn prepare<C>(self, cx: &C) -> Result<Activity> fn prepare<'c, C>(self, cx: &C) -> Result<Activity>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
let (actor, object, tag) = get_activity_data(cx, self.into())?; let (actor, object, tag) = get_activity_data(cx, self.into())?;
Ok(Activity { Ok(Activity {
@ -481,9 +486,9 @@ pub mod systems {
} }
/// Deliver the activity-like payload to all its intended recipients. /// Deliver the activity-like payload to all its intended recipients.
pub async fn deliver<C>(cx: &C, payload: impl Payload) -> Result<()> pub async fn deliver<'c, C>(cx: &C, payload: impl Payload) -> Result<()>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
let activity = payload.prepare(cx)?; let activity = payload.prepare(cx)?;
@ -504,9 +509,12 @@ pub mod systems {
} }
/// Calculate the list of inboxes to send the activity to. /// Calculate the list of inboxes to send the activity to.
fn get_targets<C>(cx: &C, activity: &Activity) -> Result<impl IntoIterator<Item = String>> fn get_targets<'c, C>(
cx: &C,
activity: &Activity,
) -> Result<impl IntoIterator<Item = String>>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
use crate::data::{Channel, Id}; use crate::data::{Channel, Id};
@ -531,9 +539,9 @@ pub mod systems {
/// Get a signing key for the given actor id. /// Get a signing key for the given actor id.
/// ///
/// Will fail if the actor isn't local. /// Will fail if the actor isn't local.
fn get_keypair<C>(cx: &C, actor_id: &str) -> Result<SigningKey> fn get_keypair<'c, C>(cx: &C, actor_id: &str) -> Result<SigningKey>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
use crate::{ use crate::{
actor::{get_signing_key, Actor}, actor::{get_signing_key, Actor},
@ -542,13 +550,13 @@ pub mod systems {
let Some(actor_key) = cx.db().lookup(Id(actor_id.to_string()))? else { let Some(actor_key) = cx.db().lookup(Id(actor_id.to_string()))? else {
panic!("could not get db key for {actor_id}"); panic!("could not get db key for {actor_id}");
}; };
get_signing_key(cx.db(), Actor { key: actor_key }).map_err(Error::Store) get_signing_key(&*cx.db(), Actor { key: actor_key }).map_err(Error::Store)
} }
/// Get actor id, the object id, and the type of the activity. /// Get actor id, the object id, and the type of the activity.
fn get_activity_data<C>(cx: &C, key: Key) -> Result<(String, String, Tag)> fn get_activity_data<'c, C>(cx: &C, key: Key) -> Result<(String, String, Tag)>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
// TODO: instead of panicking, return a normal error. // TODO: instead of panicking, return a normal error.
use crate::data::{Id, Object, ActivityKind, ObjectKind}; use crate::data::{Id, Object, ActivityKind, ObjectKind};
@ -617,13 +625,13 @@ pub mod systems {
/// Process an incoming activity. /// Process an incoming activity.
#[tracing::instrument(target = "puppy.processor", skip_all, fields(activity = root.id))] #[tracing::instrument(target = "puppy.processor", skip_all, fields(activity = root.id))]
pub async fn process_incoming<C>( pub async fn process_incoming<'c, C>(
cx: &C, cx: &C,
on_behalf_of: &SigningKey, on_behalf_of: &SigningKey,
root: Activity, root: Activity,
) -> Result<(Key, Tag)> ) -> Result<(Key, Tag)>
where where
C: Context, C: Context<'c>,
{ {
if cx.is_local(&root.id) || cx.is_known(&root.id) { if cx.is_local(&root.id) || cx.is_known(&root.id) {
panic!("could not process activity, it already exists"); panic!("could not process activity, it already exists");
@ -670,12 +678,12 @@ pub mod systems {
/// If this is not the case, it will panic. /// If this is not the case, it will panic.
/// ///
/// Specifically, /// Specifically,
#[tracing::instrument(level = "TRACE", target = "puppy.processor", skip_all, fields(activity = activity.id))] #[tracing::instrument(level = "TRACE", target = "puppy.processor", skip_all, fields(activity = activity.id, tag = activity.kind))]
async fn apply_activity<C>(cx: &C, activity: Activity) -> Result<(Key, Tag)> async fn apply_activity<'c, C>(cx: &C, activity: Activity) -> Result<(Key, Tag)>
where where
C: Context, C: Context<'c>,
{ {
use crate::data::{Id, Create}; use crate::data::{Id, Create, Object, Accept};
// Get a key and error out if it does not exist // Get a key and error out if it does not exist
let get_key = |url: &str| -> Key { let get_key = |url: &str| -> Key {
cx.db() cx.db()
@ -691,12 +699,12 @@ pub mod systems {
let target = get_key(activity.object.id()); let target = get_key(activity.object.id());
let req = following::create_follow_request(cx, requester, target)?; let req = following::create_follow_request(cx, requester, target)?;
debug!("created follow request {req}");
// For now, automatically accept follow requests. // For now, automatically accept follow requests.
if cx.is_local(activity.object.id()) { if cx.is_local(activity.object.id()) {
debug!("auto-accepting follow request for local actor {target}",); debug!("auto-accepting follow request for local actor {target}",);
let accept = following::accept_follow_request(cx, req)?; let accept = following::accept_follow_request(cx, req)?;
// TODO: Add object components to accept activity before sending
if !cx.is_local(&activity.actor) { if !cx.is_local(&activity.actor) {
debug!("delivering to remote actor"); debug!("delivering to remote actor");
delivery::deliver(cx, accept).await?; delivery::deliver(cx, accept).await?;
@ -732,14 +740,14 @@ pub mod systems {
/// ///
/// The `budget` parameter specifies the maximum number of recursive calls. /// The `budget` parameter specifies the maximum number of recursive calls.
#[tracing::instrument(target = "puppy.processor", skip_all, fields(budget = budget, target = root.id))] #[tracing::instrument(target = "puppy.processor", skip_all, fields(budget = budget, target = root.id))]
async fn fetch_dependencies<C>( async fn fetch_dependencies<'c, C>(
cx: &C, cx: &C,
root: &Activity, root: &Activity,
auth: &SigningKey, auth: &SigningKey,
budget: usize, budget: usize,
) -> Result<(Vec<Actor>, Vec<Note>, Vec<Activity>)> ) -> Result<(Vec<Actor>, Vec<Note>, Vec<Activity>)>
where where
C: Context, C: Context<'c>,
{ {
let mut actors = Vec::new(); let mut actors = Vec::new();
let mut notes = Vec::new(); let mut notes = Vec::new();
@ -789,24 +797,24 @@ pub mod systems {
Ok((actors, notes, activities)) Ok((actors, notes, activities))
} }
fn store_actor<C>(cx: &C, actor: Actor) -> Result<()> fn store_actor<'c, C>(cx: &C, actor: Actor) -> Result<()>
where where
C: Context, C: Context<'c>,
{ {
if !cx.is_known(&actor.id) { if !cx.is_known(&actor.id) {
crate::actor::create_remote(cx.db(), actor)?; crate::actor::create_remote(&*cx.db(), actor)?;
} else { } else {
debug!("actor {} is already known", actor.id); debug!("actor {} is already known", actor.id);
} }
Ok(()) Ok(())
} }
fn store_note<C>(cx: &C, note: Note) -> Result<()> fn store_note<'c, C>(cx: &C, note: Note) -> Result<()>
where where
C: Context, C: Context<'c>,
{ {
if !cx.is_known(&note.id) { if !cx.is_known(&note.id) {
crate::post::create_post_from_note(cx.db(), note)?; crate::post::create_post_from_note(&*cx.db(), note)?;
} else { } else {
debug!("note {} is already known", note.id); debug!("note {} is already known", note.id);
} }
@ -843,9 +851,13 @@ pub mod systems {
}; };
/// Create a follow request. /// Create a follow request.
pub fn create_follow_request<C>(cx: &C, follower: Key, target: Key) -> Result<FollowRequest> pub fn create_follow_request<'c, C>(
cx: &C,
follower: Key,
target: Key,
) -> Result<FollowRequest>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
Ok(Key::gen().into()) Ok(Key::gen().into())
} }
@ -856,27 +868,27 @@ pub mod systems {
/// the follow request is withdrawn. /// the follow request is withdrawn.
/// ///
/// This creates a new [`Undo`] entry in the database to which data may be attached. /// This creates a new [`Undo`] entry in the database to which data may be attached.
pub fn cancel_follow_request<C>(cx: &C, req: FollowRequest) -> Result<Undo> pub fn cancel_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Undo>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
Ok(Key::gen().into()) Ok(Key::gen().into())
} }
/// Apply the changes related to accepting a follow request to the social graph and create a new node representing /// Apply the changes related to accepting a follow request to the social graph and create a new node representing
/// the event. /// the event.
pub fn accept_follow_request<C>(cx: &C, req: FollowRequest) -> Result<Accept> pub fn accept_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Accept>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
Ok(Key::gen().into()) Ok(Key::gen().into())
} }
/// Apply the changes related to rejecting a follow request to the social graph and create a new node representing /// Apply the changes related to rejecting a follow request to the social graph and create a new node representing
/// the event. /// the event.
pub fn reject_follow_request<C>(cx: &C, req: FollowRequest) -> Result<Reject> pub fn reject_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Reject>
where where
C: Context + ?Sized, C: Context<'c>,
{ {
Ok(Key::gen().into()) Ok(Key::gen().into())
} }
@ -894,12 +906,17 @@ pub mod systems {
/// Describes objects which have a "revert" operation defined (that is, they can be the target of an [`Undo`] activity). /// Describes objects which have a "revert" operation defined (that is, they can be the target of an [`Undo`] activity).
pub trait Reversible { pub trait Reversible {
/// Undo `self` and generate a corresponding [`Undo`] object recording this fact. /// Undo `self` and generate a corresponding [`Undo`] object recording this fact.
fn revert(&self, cx: &dyn Context) -> crate::Result<Undo>; fn revert<'c, C>(&self, cx: &C) -> crate::Result<Undo>
where
C: Context<'c>;
} }
impl Reversible for FollowRequest { impl Reversible for FollowRequest {
/// Withdraw a follow request if it wasn't yet accepted, or unfollow someone. /// Withdraw a follow request if it wasn't yet accepted, or unfollow someone.
fn revert(&self, cx: &dyn Context) -> crate::Result<Undo> { fn revert<'c, C>(&self, cx: &C) -> crate::Result<Undo>
where
C: Context<'c>,
{
following::cancel_follow_request(cx, *self) following::cancel_follow_request(cx, *self)
} }
} }