Compare commits
2 commits
mistress
...
restructur
Author | SHA1 | Date | |
---|---|---|---|
9c6e7b1471 | |||
7641add472 |
12 changed files with 785 additions and 40 deletions
51
Cargo.lock
generated
51
Cargo.lock
generated
|
@ -518,6 +518,21 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.30"
|
||||
|
@ -525,6 +540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -533,6 +549,34 @@ version = "0.3.30"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.60",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.30"
|
||||
|
@ -551,10 +595,16 @@ version = "0.3.30"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1258,6 +1308,7 @@ dependencies = [
|
|||
"derive_more",
|
||||
"either",
|
||||
"fetch",
|
||||
"futures",
|
||||
"serde_json",
|
||||
"store",
|
||||
"tracing",
|
||||
|
|
|
@ -214,7 +214,7 @@ const GET: &Method = &Method::GET;
|
|||
/// This function is where all requests to a protected endpoint have to go through. If the request
|
||||
/// was signed but does not target a protected endpoint, this function will fall back to the
|
||||
/// [`dispatch_public`] handler.
|
||||
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||
// #[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||
async fn dispatch_signed(
|
||||
cx: Context,
|
||||
verifier: &Verifier,
|
||||
|
@ -236,7 +236,7 @@ async fn dispatch_signed(
|
|||
/// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the
|
||||
/// function will return a 404 response. If the path *does* exist, but the signature is not
|
||||
/// valid, they will also get a 404.
|
||||
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||
// #[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||
async fn dispatch_public(
|
||||
cx: Context,
|
||||
verifier: &Verifier,
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
//! ActivityPub handlers.
|
||||
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
|
||||
use http_body_util::Full;
|
||||
use hyper::body::Bytes;
|
||||
use puppy::{
|
||||
actor::{get_signing_key, Actor},
|
||||
fetch::object::Activity,
|
||||
get_local_ap_object, Context, Error, Key,
|
||||
fetch::{object::Activity, Client},
|
||||
get_local_ap_object,
|
||||
store::Transaction,
|
||||
Context, Error, Key,
|
||||
};
|
||||
use serde_json::Value;
|
||||
use tracing::{info, instrument};
|
||||
|
@ -81,10 +85,38 @@ pub async fn inbox(
|
|||
"processing object '{id}'",
|
||||
};
|
||||
|
||||
let tx = cx.store().start();
|
||||
let key = get_signing_key(&tx, Actor { key: receiver }).unwrap();
|
||||
|
||||
struct InboxContext<'c> {
|
||||
cx: &'c Context,
|
||||
tx: Mutex<Transaction<'c>>,
|
||||
}
|
||||
|
||||
impl<'a> puppy::systems::Context<'a> for InboxContext<'a> {
|
||||
/// Access the transaction.
|
||||
fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>>
|
||||
where
|
||||
'a: 'y,
|
||||
'y: 'a,
|
||||
{
|
||||
self.tx.lock().unwrap()
|
||||
}
|
||||
/// Access the federation client.
|
||||
fn client(&self) -> &Client {
|
||||
self.cx.resolver()
|
||||
}
|
||||
/// Get the ActivityPub domain of this server.
|
||||
fn domain(&self) -> &str {
|
||||
&self.cx.config().ap_domain
|
||||
}
|
||||
}
|
||||
|
||||
match Activity::from_json(json) {
|
||||
Ok(activity) => {
|
||||
puppy::ingest(&cx, receiver, &activity).await.unwrap();
|
||||
match puppy::interpret(&cx, activity) {
|
||||
let context = InboxContext { tx: Mutex::new(tx), cx };
|
||||
let r = puppy::systems::processor::process_incoming(&context, &key, activity).await;
|
||||
match r {
|
||||
Ok(_) => Ok(respond!(code: 202)),
|
||||
Err(err) => fuck!(400: "error interpreting activity: {err}"),
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ mod api;
|
|||
async fn main() {
|
||||
Registry::default()
|
||||
.with(filter_fn(|meta| !meta.target().starts_with("reqwest")))
|
||||
.with(filter_fn(|meta| *meta.level() < Level::DEBUG))
|
||||
//.with(filter_fn(|meta| *meta.level() < Level::DEBUG))
|
||||
.with(ForestLayer::default())
|
||||
.init();
|
||||
// TODO: load the config from a file or something.
|
||||
|
|
|
@ -38,13 +38,13 @@ impl Client {
|
|||
///
|
||||
/// Note that in order for the request to be considered valid by most implementations, `key.owner`
|
||||
/// must equal `payload.actor`.
|
||||
#[instrument(target = "fetch.delivery", skip_all, fields(activity = payload.id, url = inbox, key = key.id))]
|
||||
pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: &str) {
|
||||
#[instrument(target = "fetch.delivery", skip_all, fields(activity = payload.id, url = inbox.as_ref(), key = key.id))]
|
||||
pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: impl AsRef<str>) {
|
||||
let system = Subsystem::Delivery;
|
||||
|
||||
let body = serde_json::to_string(&payload.to_json_ld()).unwrap();
|
||||
let mut req = system
|
||||
.new_request(inbox)
|
||||
.new_request(inbox.as_ref())
|
||||
.unwrap()
|
||||
.method(Method::POST)
|
||||
.header("content-type", ACTIVITYPUB_TYPE)
|
||||
|
|
|
@ -14,3 +14,4 @@ either = "*"
|
|||
derive_more = "*"
|
||||
serde_json = "*"
|
||||
tracing = "*"
|
||||
futures = "*"
|
||||
|
|
|
@ -90,6 +90,14 @@ pub struct FollowRequest {
|
|||
pub target: Key,
|
||||
}
|
||||
|
||||
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct Accept {
|
||||
#[identity]
|
||||
pub id: Key,
|
||||
pub origin: Key,
|
||||
pub target: Key,
|
||||
}
|
||||
|
||||
/// The status of a [`FollowRequest`].
|
||||
///
|
||||
/// Valid state transitions:
|
||||
|
@ -212,4 +220,5 @@ pub fn schema() -> Schema {
|
|||
.has::<AuthorOf>()
|
||||
.has::<Follows>()
|
||||
.has::<Create>()
|
||||
.has::<Accept>()
|
||||
}
|
||||
|
|
|
@ -2,7 +2,13 @@
|
|||
//! you should take a look at [`fetch`].
|
||||
|
||||
// Working with result types is such a bitch without these.
|
||||
#![feature(iterator_try_collect, try_blocks, once_cell_try, box_into_inner)]
|
||||
#![feature(
|
||||
iterator_try_collect,
|
||||
try_blocks,
|
||||
once_cell_try,
|
||||
box_into_inner,
|
||||
type_changing_struct_update
|
||||
)]
|
||||
|
||||
use std::hint::unreachable_unchecked;
|
||||
|
||||
|
@ -336,7 +342,7 @@ pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()>
|
|||
match object {
|
||||
Object::Activity(a) => interpret(&cx, a)?,
|
||||
Object::Actor(a) => cx.run(|tx| actor::create_remote(tx, a).map(void))?,
|
||||
Object::Note(a) => post::create_post_from_note(cx, a).map(void)?,
|
||||
Object::Note(a) => cx.run(|tx| post::create_post_from_note(tx, a).map(void))?,
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
|
@ -345,3 +351,630 @@ pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()>
|
|||
|
||||
/// Discard the argument.
|
||||
fn void<T>(_: T) -> () {}
|
||||
|
||||
pub mod systems {
|
||||
//! Logic containment zone.
|
||||
|
||||
use std::sync::MutexGuard;
|
||||
|
||||
use fetch::Client;
|
||||
use store::{Key, Transaction};
|
||||
use tracing::warn;
|
||||
|
||||
/// Allows subsystems to interact with the [fetch] and [store] components.
|
||||
pub trait Context<'a> {
|
||||
/// Access the transaction.
|
||||
fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>>
|
||||
where
|
||||
'a: 'y,
|
||||
'y: 'a;
|
||||
/// Access the federation client.
|
||||
fn client(&self) -> &Client;
|
||||
/// Get the ActivityPub domain of this server.
|
||||
fn domain(&self) -> &str;
|
||||
/// Format `key` as an object ID.
|
||||
fn make_object_id(&self, key: Key) -> String {
|
||||
format!("https://{}/o/{key}", self.domain())
|
||||
}
|
||||
/// Check whether we already know about an object.
|
||||
fn is_known(&self, url: &str) -> bool {
|
||||
use crate::data::Id;
|
||||
|
||||
match self.db().lookup(Id(url.to_string())) {
|
||||
Ok(Some(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
/// Check whether `url` refers to an object that is considered "local" to this server.
|
||||
fn is_local(&self, url: &str) -> bool {
|
||||
use crate::data::{Id, Object};
|
||||
|
||||
let Ok(Some(key)) = self.db().lookup(Id(url.to_string())) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
match self.db().get_mixin(key) {
|
||||
Ok(Some(Object { local, .. })) => local,
|
||||
Ok(None) => false,
|
||||
Err(err) => {
|
||||
warn!("error while trying to determine origin of {key}: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod notification {
|
||||
//! Manages the delivery of notifications to local users.
|
||||
|
||||
use tracing::info;
|
||||
|
||||
use crate::Result;
|
||||
use super::Context;
|
||||
|
||||
/// Something that can be sent to a local user as a (push) notification.
|
||||
pub trait Notification {}
|
||||
|
||||
/// Get the notification where it needs to go.
|
||||
pub fn dispatch<'x, C>(cx: &C, event: &impl Notification) -> Result<()>
|
||||
where
|
||||
C: Context<'x>,
|
||||
{
|
||||
info!("ding!");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub mod delivery {
|
||||
//! A mid-level system for delivering messages to remote servers.
|
||||
|
||||
use fetch::{
|
||||
object::{Activity, Object},
|
||||
signatures::SigningKey,
|
||||
};
|
||||
use futures::{stream::FuturesUnordered, Future, StreamExt as _};
|
||||
use derive_more::Display;
|
||||
use store::{arrow::Multi, Key};
|
||||
use tracing::error;
|
||||
|
||||
use crate::{entities::Accept, Error, Result};
|
||||
|
||||
use super::Context;
|
||||
|
||||
/// Type tag for an activity.
|
||||
#[derive(Clone, Copy, Display)]
|
||||
pub enum Tag {
|
||||
Follow,
|
||||
Accept,
|
||||
Reject,
|
||||
Create,
|
||||
Delete,
|
||||
Bite,
|
||||
}
|
||||
|
||||
/// Implemented for types that can represent an activity that can be delivered to a remote server.
|
||||
pub trait Payload {
|
||||
/// Construct an [`Activity`] for delivery.
|
||||
fn prepare<'c, C>(self, cx: &C) -> Result<Activity>
|
||||
where
|
||||
C: Context<'c>;
|
||||
/// Call the delivery subsystem to this payload everywhere it needs to go.
|
||||
///
|
||||
/// A convenience method to call [`deliver`] as a method instead of a free-standing function.
|
||||
fn deliver<'c, C>(self, cx: &C) -> impl Future<Output = Result<()>> + Send + Sync
|
||||
where
|
||||
Self: Sized + Send + Sync,
|
||||
C: Context<'c> + Send + Sync,
|
||||
{
|
||||
deliver(cx, self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Payload for Accept {
|
||||
fn prepare<'c, C>(self, cx: &C) -> Result<Activity>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
let (actor, object, tag) = get_activity_data(cx, self.into())?;
|
||||
Ok(Activity {
|
||||
id: cx.make_object_id(self.into()),
|
||||
actor,
|
||||
object: Box::new(Object::Id { id: object }),
|
||||
kind: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Deliver the activity-like payload to all its intended recipients.
|
||||
pub async fn deliver<'c, C>(cx: &C, payload: impl Payload) -> Result<()>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
let activity = payload.prepare(cx)?;
|
||||
|
||||
if !cx.is_local(&activity.id) {
|
||||
error!("delivery of non-local activity!!");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let signing_key = get_keypair(cx, &activity.actor)?;
|
||||
get_targets(cx, &activity)?
|
||||
.into_iter()
|
||||
.map(|inbox| cx.client().deliver(&signing_key, &activity, inbox))
|
||||
.collect::<FuturesUnordered<_>>()
|
||||
.collect::<()>()
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Calculate the list of inboxes to send the activity to.
|
||||
fn get_targets<'c, C>(
|
||||
cx: &C,
|
||||
activity: &Activity,
|
||||
) -> Result<impl IntoIterator<Item = String>>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
use crate::data::{Channel, Id};
|
||||
|
||||
let get_inbox = |id: &str| -> Result<Option<String>> {
|
||||
let Some(key) = cx.db().lookup(Id(id.to_owned()))? else {
|
||||
return Ok(None);
|
||||
};
|
||||
cx.db()
|
||||
.get_mixin(key)
|
||||
.map(|o| o.map(|Channel { inbox }| inbox))
|
||||
.map_err(Error::Store)
|
||||
};
|
||||
|
||||
match *activity.object {
|
||||
Object::Id { ref id } => get_inbox(&id),
|
||||
Object::Activity(ref a) => get_inbox(&a.actor),
|
||||
Object::Actor(ref a) => Ok(Some(a.inbox.clone())),
|
||||
Object::Note(ref a) => get_inbox(&a.author),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a signing key for the given actor id.
|
||||
///
|
||||
/// Will fail if the actor isn't local.
|
||||
fn get_keypair<'c, C>(cx: &C, actor_id: &str) -> Result<SigningKey>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
use crate::{
|
||||
actor::{get_signing_key, Actor},
|
||||
data::Id,
|
||||
};
|
||||
let Some(actor_key) = cx.db().lookup(Id(actor_id.to_string()))? else {
|
||||
panic!("could not get db key for {actor_id}");
|
||||
};
|
||||
get_signing_key(&*cx.db(), Actor { key: actor_key }).map_err(Error::Store)
|
||||
}
|
||||
|
||||
/// Get actor id, the object id, and the type of the activity.
|
||||
fn get_activity_data<'c, C>(cx: &C, key: Key) -> Result<(String, String, Tag)>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
// TODO: instead of panicking, return a normal error.
|
||||
use crate::data::{Id, Object, ActivityKind, ObjectKind};
|
||||
|
||||
let Some(Object { kind, .. }) = cx.db().get_mixin(key)? else {
|
||||
panic!("activity must be an object")
|
||||
};
|
||||
let Some(Multi { origin, target, .. }) = cx.db().get_arrow_raw(key)? else {
|
||||
panic!("expected activity to have multi-arrow repr")
|
||||
};
|
||||
let Some(Id(object)) = cx.db().get_alias(target)? else {
|
||||
panic!("expected object of activity to have an id")
|
||||
};
|
||||
let Some(Id(actor)) = cx.db().get_alias(origin)? else {
|
||||
panic!("expected author of activity to have an id")
|
||||
};
|
||||
|
||||
let tag = match kind {
|
||||
ObjectKind::Activity(tag) => match tag {
|
||||
ActivityKind::Create => Tag::Create,
|
||||
ActivityKind::Follow => Tag::Follow,
|
||||
ActivityKind::Accept => Tag::Accept,
|
||||
ActivityKind::Reject => Tag::Reject,
|
||||
ActivityKind::Bite => Tag::Bite,
|
||||
},
|
||||
_ => panic!("invalid kind for activity"),
|
||||
};
|
||||
|
||||
Ok((actor, object, tag))
|
||||
}
|
||||
}
|
||||
|
||||
pub mod ingestion {
|
||||
//! Processing remote objects to make them part of our local graph.
|
||||
}
|
||||
|
||||
pub mod processor {
|
||||
//! The structure and interpretation of activity graphs.
|
||||
//!
|
||||
//! This is where the magic happens.
|
||||
//!
|
||||
//! This module contains the code that assembles and executes a graph of activities. This is a higher
|
||||
//! level system, that depends on the following sibling systems:
|
||||
//!
|
||||
//! - [`following`][super::following]
|
||||
//! - [`delivery`][super::delivery]
|
||||
//!
|
||||
//! Its main purpose is to integrate all the above system in the context of receiving an activity in
|
||||
//! an inbox, and the subsequent processing that needs to happen. In order to *correctly* interpret an
|
||||
//! activity, we need to know about the other nodes that it references.
|
||||
|
||||
use fetch::{
|
||||
object::{Activity, Actor, Note, Object},
|
||||
signatures::SigningKey,
|
||||
FetchError,
|
||||
};
|
||||
use store::Key;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
use crate::{data, Result};
|
||||
|
||||
use super::{
|
||||
delivery::{self, Tag},
|
||||
following, Context,
|
||||
};
|
||||
|
||||
/// Process an incoming activity.
|
||||
#[tracing::instrument(target = "puppy.processor", skip_all, fields(activity = root.id))]
|
||||
pub async fn process_incoming<'c, C>(
|
||||
cx: &C,
|
||||
on_behalf_of: &SigningKey,
|
||||
root: Activity,
|
||||
) -> Result<(Key, Tag)>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
if cx.is_local(&root.id) || cx.is_known(&root.id) {
|
||||
panic!("could not process activity, it already exists");
|
||||
}
|
||||
|
||||
// Fetch all transitive dependencies of `root`, returned in the order that they need to be processed.
|
||||
let (actors, notes, activities) =
|
||||
fetch_dependencies(cx, &root, &on_behalf_of, 3).await?;
|
||||
|
||||
trace!("storing actors");
|
||||
for actor in actors {
|
||||
store_actor(cx, actor)?;
|
||||
}
|
||||
|
||||
trace!("storing notes");
|
||||
for note in notes {
|
||||
store_note(cx, note)?;
|
||||
}
|
||||
|
||||
// Process the activities that have to execute before `activity`. These dependencies are returned
|
||||
// in the exact order that they need to be applied. `fetch_dependencies` only returns stuff that is
|
||||
// missing from the graph.
|
||||
trace!("applying dependency activities");
|
||||
for dep in activities {
|
||||
let (key, tag) = apply_activity(cx, dep).await?;
|
||||
debug!(
|
||||
dependency_of = root.id,
|
||||
kind = tag.to_string(),
|
||||
"processed activity {key}"
|
||||
);
|
||||
}
|
||||
|
||||
// Now that all activities that this depends on have been executed, we can execute the original
|
||||
// one.
|
||||
trace!("executing target activity");
|
||||
apply_activity(cx, root).await
|
||||
}
|
||||
|
||||
/// Execute the activity.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function assumes that everything the activity references by ActivityPub ID is already present in the context.
|
||||
/// If this is not the case, it will panic.
|
||||
///
|
||||
/// Specifically,
|
||||
#[tracing::instrument(level = "TRACE", target = "puppy.processor", skip_all, fields(activity = activity.id, tag = activity.kind))]
|
||||
async fn apply_activity<'c, C>(cx: &C, activity: Activity) -> Result<(Key, Tag)>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
use crate::data::{Id, Create, Object, Accept};
|
||||
// Get a key and error out if it does not exist
|
||||
let get_key = |url: &str| -> Key {
|
||||
cx.db()
|
||||
.lookup(data::Id(url.to_string()))
|
||||
.expect("database should be operable")
|
||||
.expect("url should already have been inserted")
|
||||
};
|
||||
|
||||
let tag = tagof(&activity);
|
||||
let key = match tag {
|
||||
Tag::Follow => {
|
||||
let requester = get_key(&activity.actor);
|
||||
let target = get_key(activity.object.id());
|
||||
|
||||
let req = following::create_follow_request(cx, requester, target)?;
|
||||
|
||||
// For now, automatically accept follow requests.
|
||||
if cx.is_local(activity.object.id()) {
|
||||
debug!("auto-accepting follow request for local actor {target}",);
|
||||
let accept = following::accept_follow_request(cx, req)?;
|
||||
// TODO: Add object components to accept activity before sending
|
||||
if !cx.is_local(&activity.actor) {
|
||||
debug!("delivering to remote actor");
|
||||
delivery::deliver(cx, accept).await?;
|
||||
}
|
||||
}
|
||||
|
||||
req.into()
|
||||
}
|
||||
Tag::Accept => {
|
||||
let req_id = get_key(activity.object.id());
|
||||
following::accept_follow_request(cx, req_id.into())?.into()
|
||||
}
|
||||
Tag::Reject => {
|
||||
let req_id = get_key(activity.object.id());
|
||||
following::reject_follow_request(cx, req_id.into())?.into()
|
||||
}
|
||||
Tag::Create => {
|
||||
let id = Key::gen();
|
||||
let actor = get_key(&activity.actor);
|
||||
let object = get_key(activity.object.id());
|
||||
cx.db().add_alias(id, Id(cx.make_object_id(id)))?;
|
||||
cx.db().create(Create { id, actor, object })?;
|
||||
id.into()
|
||||
}
|
||||
Tag::Delete => todo!(),
|
||||
Tag::Bite => todo!(),
|
||||
};
|
||||
|
||||
Ok((key, tag))
|
||||
}
|
||||
|
||||
/// Fetch all transitive dependencies of `root` using the given `auth` as the signing authority.
|
||||
///
|
||||
/// The `budget` parameter specifies the maximum number of recursive calls.
|
||||
#[tracing::instrument(target = "puppy.processor", skip_all, fields(budget = budget, target = root.id))]
|
||||
async fn fetch_dependencies<'c, C>(
|
||||
cx: &C,
|
||||
root: &Activity,
|
||||
auth: &SigningKey,
|
||||
budget: usize,
|
||||
) -> Result<(Vec<Actor>, Vec<Note>, Vec<Activity>)>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
let mut actors = Vec::new();
|
||||
let mut notes = Vec::new();
|
||||
let mut activities = Vec::new();
|
||||
|
||||
for url in [root.actor.as_str(), root.object.id()] {
|
||||
if cx.is_known(url) {
|
||||
debug!(parent = root.id, url, "already known, skipping");
|
||||
continue;
|
||||
} else {
|
||||
debug!(parent = root.id, url, budget, "fetching dependency");
|
||||
}
|
||||
|
||||
let json = cx.client().resolve(auth, url).await?;
|
||||
let object = Object::from_json(json).map_err(FetchError::BadJson)?;
|
||||
|
||||
match object {
|
||||
Object::Id { id } => {
|
||||
warn!(parent = root.id, url, "could not fetch {id}");
|
||||
}
|
||||
Object::Activity(a) if budget == 0 => {
|
||||
info!(parent = root.id, url, "exceeded budget, skipping");
|
||||
activities.push(a);
|
||||
continue;
|
||||
}
|
||||
Object::Activity(a) => {
|
||||
debug!(parent = root.id, url, "fetching dependencies");
|
||||
// BUG: this won't hit cache because none of the stuff is in there yet.
|
||||
let (x, y, z) =
|
||||
Box::pin(fetch_dependencies(cx, &a, auth, budget - 1)).await?;
|
||||
trace!(
|
||||
parent = root.id,
|
||||
url,
|
||||
"fetch completed, total: {}",
|
||||
x.len() + y.len() + z.len()
|
||||
);
|
||||
actors.extend(x);
|
||||
notes.extend(y);
|
||||
activities.extend(z);
|
||||
activities.push(a);
|
||||
}
|
||||
Object::Actor(a) => actors.push(a),
|
||||
Object::Note(a) => notes.push(a),
|
||||
}
|
||||
}
|
||||
|
||||
Ok((actors, notes, activities))
|
||||
}
|
||||
|
||||
fn store_actor<'c, C>(cx: &C, actor: Actor) -> Result<()>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
if !cx.is_known(&actor.id) {
|
||||
crate::actor::create_remote(&*cx.db(), actor)?;
|
||||
} else {
|
||||
debug!("actor {} is already known", actor.id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn store_note<'c, C>(cx: &C, note: Note) -> Result<()>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
if !cx.is_known(¬e.id) {
|
||||
crate::post::create_post_from_note(&*cx.db(), note)?;
|
||||
} else {
|
||||
debug!("note {} is already known", note.id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the tag of the activity.
|
||||
fn tagof(a: &Activity) -> Tag {
|
||||
match a.kind.as_str() {
|
||||
"Create" => Tag::Create,
|
||||
"Delete" => Tag::Delete,
|
||||
"Follow" => Tag::Follow,
|
||||
"Accept" => Tag::Accept,
|
||||
"Reject" => Tag::Reject,
|
||||
"Bite" => Tag::Bite,
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod timelines {
|
||||
//! Manages the rendering of timelines.
|
||||
}
|
||||
|
||||
pub mod following {
|
||||
//! Follow requests and management thereof.
|
||||
|
||||
use store::Key;
|
||||
|
||||
use super::Context;
|
||||
use crate::{
|
||||
entities::{Accept, FollowRequest, Reject, Undo},
|
||||
Result,
|
||||
};
|
||||
|
||||
/// Create a follow request.
|
||||
pub fn create_follow_request<'c, C>(
|
||||
cx: &C,
|
||||
follower: Key,
|
||||
target: Key,
|
||||
) -> Result<FollowRequest>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
Ok(Key::gen().into())
|
||||
}
|
||||
|
||||
/// Cancel a follow request.
|
||||
///
|
||||
/// If the follow request was already accepted, the follow request's target is unfollowed by the actor. Otherwise,
|
||||
/// the follow request is withdrawn.
|
||||
///
|
||||
/// This creates a new [`Undo`] entry in the database to which data may be attached.
|
||||
pub fn cancel_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Undo>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
Ok(Key::gen().into())
|
||||
}
|
||||
|
||||
/// Apply the changes related to accepting a follow request to the social graph and create a new node representing
|
||||
/// the event.
|
||||
pub fn accept_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Accept>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
Ok(Key::gen().into())
|
||||
}
|
||||
|
||||
/// Apply the changes related to rejecting a follow request to the social graph and create a new node representing
|
||||
/// the event.
|
||||
pub fn reject_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Reject>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
Ok(Key::gen().into())
|
||||
}
|
||||
}
|
||||
|
||||
pub mod reverse {
|
||||
//! Undoing operations.
|
||||
//!
|
||||
//! This module defines the behavior of the [`Undo`] activity.
|
||||
|
||||
use crate::entities::{FollowRequest, Undo};
|
||||
|
||||
use super::{following, Context};
|
||||
|
||||
/// Describes objects which have a "revert" operation defined (that is, they can be the target of an [`Undo`] activity).
|
||||
pub trait Reversible {
|
||||
/// Undo `self` and generate a corresponding [`Undo`] object recording this fact.
|
||||
fn revert<'c, C>(&self, cx: &C) -> crate::Result<Undo>
|
||||
where
|
||||
C: Context<'c>;
|
||||
}
|
||||
|
||||
impl Reversible for FollowRequest {
|
||||
/// Withdraw a follow request if it wasn't yet accepted, or unfollow someone.
|
||||
fn revert<'c, C>(&self, cx: &C) -> crate::Result<Undo>
|
||||
where
|
||||
C: Context<'c>,
|
||||
{
|
||||
following::cancel_follow_request(cx, *self)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod entities {
|
||||
//! Virtual data types composed from [components] and operated on by [systems].
|
||||
//!
|
||||
//! [components]: crate::components
|
||||
//! [systems]: crate::systems
|
||||
|
||||
use store::Key;
|
||||
use derive_more::{From, Into, Display};
|
||||
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Poster(Key);
|
||||
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Server(Key);
|
||||
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Post(Key);
|
||||
|
||||
/// Represents a `Bite` activity.
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Bite(Key);
|
||||
|
||||
/// Represents an `Undo` activity.
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Undo(Key);
|
||||
|
||||
/// Represents an `Accept` activity.
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Accept(Key);
|
||||
|
||||
/// Represents a `Reject` activity.
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Reject(Key);
|
||||
|
||||
/// Represents a `Create` activity.
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct Create(Key);
|
||||
|
||||
/// Represents a `Follow` activity.
|
||||
///
|
||||
/// Also see the [`following`][crate::systems::following] module, which defines the logic for following, follow requests
|
||||
/// and other related stuff.
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct FollowRequest(Key);
|
||||
|
||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
||||
pub struct PublicKey(Key);
|
||||
|
||||
/// A key newtype that represents an object.
|
||||
pub trait Entity: Into<Key> + From<Key> + Copy {}
|
||||
|
||||
impl<T> Entity for T where T: Into<Key> + From<Key> + Copy {}
|
||||
}
|
||||
|
||||
pub mod components {}
|
||||
|
|
|
@ -152,29 +152,27 @@ pub fn create_local_post(
|
|||
}
|
||||
|
||||
/// Assumes all objects referenced already exist.
|
||||
#[tracing::instrument(skip(cx))]
|
||||
pub fn create_post_from_note(cx: &Context, note: Note) -> crate::Result<Post> {
|
||||
cx.run(|tx| {
|
||||
let Some(author) = tx.lookup(Id(note.author))? else {
|
||||
panic!("needed author to already exist")
|
||||
};
|
||||
#[tracing::instrument(skip(tx))]
|
||||
pub fn create_post_from_note(tx: &Transaction<'_>, note: Note) -> crate::Result<Post> {
|
||||
let Some(author) = tx.lookup(Id(note.author))? else {
|
||||
panic!("needed author to already exist")
|
||||
};
|
||||
|
||||
let key = Key::gen();
|
||||
let key = Key::gen();
|
||||
|
||||
tx.add_alias(key, Id(note.id.clone()))?;
|
||||
tx.create(AuthorOf { object: key, author })?;
|
||||
tx.add_mixin(key, Content {
|
||||
content: note.content,
|
||||
warning: note.summary,
|
||||
})?;
|
||||
tx.add_mixin(key, data::Object {
|
||||
kind: ObjectKind::Notelike(note.kind),
|
||||
id: Id(note.id),
|
||||
local: false,
|
||||
})?;
|
||||
tx.add_alias(key, Id(note.id.clone()))?;
|
||||
tx.create(AuthorOf { object: key, author })?;
|
||||
tx.add_mixin(key, Content {
|
||||
content: note.content,
|
||||
warning: note.summary,
|
||||
})?;
|
||||
tx.add_mixin(key, data::Object {
|
||||
kind: ObjectKind::Notelike(note.kind),
|
||||
id: Id(note.id),
|
||||
local: false,
|
||||
})?;
|
||||
|
||||
Ok(Post { key })
|
||||
})
|
||||
Ok(Post { key })
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(cx))]
|
||||
|
|
|
@ -222,11 +222,15 @@ impl Transaction<'_> {
|
|||
where
|
||||
A: Arrow<Kind = Multi>,
|
||||
{
|
||||
Ok(self.get_arrow_raw(key)?.map(A::from))
|
||||
}
|
||||
/// Construct the arrow from its identifier.
|
||||
pub fn get_arrow_raw(&self, key: Key) -> Result<Option<Multi>> {
|
||||
let arrow = self
|
||||
.open(crate::types::MULTIEDGE_HEADERS)
|
||||
.get(key)?
|
||||
.map(|v| Key::split(v.as_ref()))
|
||||
.map(|(origin, target)| A::from(Multi { origin, target, identity: key }));
|
||||
.map(|(origin, target)| Multi { origin, target, identity: key });
|
||||
Ok(arrow)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,13 +3,14 @@ use std::{
|
|||
str::FromStr,
|
||||
};
|
||||
|
||||
use bincode::{Decode, Encode};
|
||||
use chrono::{DateTime, Utc};
|
||||
use ulid::Ulid;
|
||||
|
||||
use crate::StoreError;
|
||||
|
||||
/// A unique identifier for vertices in the database.
|
||||
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
|
||||
pub struct Key(pub(crate) [u8; 16]);
|
||||
|
||||
impl Key {
|
||||
|
|
|
@ -68,7 +68,7 @@ pub struct Batch {
|
|||
}
|
||||
|
||||
impl Store {
|
||||
/// Run a [transaction][Transaction].
|
||||
/// Run a [transaction][Transaction] and ensure that it is either committed or rolled back.
|
||||
///
|
||||
/// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not
|
||||
/// recorded. Changes made inside a transaction can be read from within that transaction before they are
|
||||
|
@ -85,15 +85,20 @@ impl Store {
|
|||
store: &self,
|
||||
};
|
||||
let r = f(&tx);
|
||||
if let Err(e) = if r.is_err() {
|
||||
tx.inner.rollback()
|
||||
if r.is_err() {
|
||||
tx.cancel()?;
|
||||
} else {
|
||||
tx.inner.commit()
|
||||
} {
|
||||
return Err(E::from(StoreError::Internal(e)));
|
||||
tx.commit()?;
|
||||
}
|
||||
r
|
||||
}
|
||||
/// Begin a transaction.
|
||||
pub fn start(&self) -> Transaction<'_> {
|
||||
Transaction {
|
||||
inner: self.inner.transaction(),
|
||||
store: &self,
|
||||
}
|
||||
}
|
||||
/// Apply a batch of changes atomically.
|
||||
pub fn apply(&self, batch: Batch) -> Result<()> {
|
||||
self.inner.write(batch.inner.into_inner())?;
|
||||
|
@ -132,6 +137,17 @@ impl Store {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'db> Transaction<'db> {
|
||||
/// Complete the transaction successfully.
|
||||
pub fn commit(self) -> Result<(), StoreError> {
|
||||
self.inner.commit().map_err(StoreError::from)
|
||||
}
|
||||
/// Cancel the transaction.
|
||||
pub fn cancel(self) -> Result<(), StoreError> {
|
||||
self.inner.rollback().map_err(StoreError::from)
|
||||
}
|
||||
}
|
||||
|
||||
/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
|
||||
pub const OK: Result<()> = Ok(());
|
||||
|
||||
|
|
Loading…
Reference in a new issue