Compare commits
No commits in common. "restructure" and "mistress" have entirely different histories.
restructur
...
mistress
12 changed files with 40 additions and 785 deletions
51
Cargo.lock
generated
51
Cargo.lock
generated
|
@ -518,21 +518,6 @@ dependencies = [
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures"
|
|
||||||
version = "0.3.30"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
|
|
||||||
dependencies = [
|
|
||||||
"futures-channel",
|
|
||||||
"futures-core",
|
|
||||||
"futures-executor",
|
|
||||||
"futures-io",
|
|
||||||
"futures-sink",
|
|
||||||
"futures-task",
|
|
||||||
"futures-util",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-channel"
|
name = "futures-channel"
|
||||||
version = "0.3.30"
|
version = "0.3.30"
|
||||||
|
@ -540,7 +525,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
|
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-sink",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -549,34 +533,6 @@ version = "0.3.30"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
|
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-executor"
|
|
||||||
version = "0.3.30"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
|
|
||||||
dependencies = [
|
|
||||||
"futures-core",
|
|
||||||
"futures-task",
|
|
||||||
"futures-util",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-io"
|
|
||||||
version = "0.3.30"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-macro"
|
|
||||||
version = "0.3.30"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn 2.0.60",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-sink"
|
name = "futures-sink"
|
||||||
version = "0.3.30"
|
version = "0.3.30"
|
||||||
|
@ -595,16 +551,10 @@ version = "0.3.30"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
|
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-channel",
|
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-io",
|
|
||||||
"futures-macro",
|
|
||||||
"futures-sink",
|
|
||||||
"futures-task",
|
"futures-task",
|
||||||
"memchr",
|
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"pin-utils",
|
"pin-utils",
|
||||||
"slab",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1308,7 +1258,6 @@ dependencies = [
|
||||||
"derive_more",
|
"derive_more",
|
||||||
"either",
|
"either",
|
||||||
"fetch",
|
"fetch",
|
||||||
"futures",
|
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"store",
|
"store",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
|
|
@ -214,7 +214,7 @@ const GET: &Method = &Method::GET;
|
||||||
/// This function is where all requests to a protected endpoint have to go through. If the request
|
/// This function is where all requests to a protected endpoint have to go through. If the request
|
||||||
/// was signed but does not target a protected endpoint, this function will fall back to the
|
/// was signed but does not target a protected endpoint, this function will fall back to the
|
||||||
/// [`dispatch_public`] handler.
|
/// [`dispatch_public`] handler.
|
||||||
// #[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||||
async fn dispatch_signed(
|
async fn dispatch_signed(
|
||||||
cx: Context,
|
cx: Context,
|
||||||
verifier: &Verifier,
|
verifier: &Verifier,
|
||||||
|
@ -236,7 +236,7 @@ async fn dispatch_signed(
|
||||||
/// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the
|
/// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the
|
||||||
/// function will return a 404 response. If the path *does* exist, but the signature is not
|
/// function will return a 404 response. If the path *does* exist, but the signature is not
|
||||||
/// valid, they will also get a 404.
|
/// valid, they will also get a 404.
|
||||||
// #[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
#[tracing::instrument(level = "DEBUG", target = "router", skip_all)]
|
||||||
async fn dispatch_public(
|
async fn dispatch_public(
|
||||||
cx: Context,
|
cx: Context,
|
||||||
verifier: &Verifier,
|
verifier: &Verifier,
|
||||||
|
|
|
@ -1,15 +1,11 @@
|
||||||
//! ActivityPub handlers.
|
//! ActivityPub handlers.
|
||||||
|
|
||||||
use std::sync::{Mutex, MutexGuard};
|
|
||||||
|
|
||||||
use http_body_util::Full;
|
use http_body_util::Full;
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
use puppy::{
|
use puppy::{
|
||||||
actor::{get_signing_key, Actor},
|
actor::{get_signing_key, Actor},
|
||||||
fetch::{object::Activity, Client},
|
fetch::object::Activity,
|
||||||
get_local_ap_object,
|
get_local_ap_object, Context, Error, Key,
|
||||||
store::Transaction,
|
|
||||||
Context, Error, Key,
|
|
||||||
};
|
};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tracing::{info, instrument};
|
use tracing::{info, instrument};
|
||||||
|
@ -85,38 +81,10 @@ pub async fn inbox(
|
||||||
"processing object '{id}'",
|
"processing object '{id}'",
|
||||||
};
|
};
|
||||||
|
|
||||||
let tx = cx.store().start();
|
|
||||||
let key = get_signing_key(&tx, Actor { key: receiver }).unwrap();
|
|
||||||
|
|
||||||
struct InboxContext<'c> {
|
|
||||||
cx: &'c Context,
|
|
||||||
tx: Mutex<Transaction<'c>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> puppy::systems::Context<'a> for InboxContext<'a> {
|
|
||||||
/// Access the transaction.
|
|
||||||
fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>>
|
|
||||||
where
|
|
||||||
'a: 'y,
|
|
||||||
'y: 'a,
|
|
||||||
{
|
|
||||||
self.tx.lock().unwrap()
|
|
||||||
}
|
|
||||||
/// Access the federation client.
|
|
||||||
fn client(&self) -> &Client {
|
|
||||||
self.cx.resolver()
|
|
||||||
}
|
|
||||||
/// Get the ActivityPub domain of this server.
|
|
||||||
fn domain(&self) -> &str {
|
|
||||||
&self.cx.config().ap_domain
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match Activity::from_json(json) {
|
match Activity::from_json(json) {
|
||||||
Ok(activity) => {
|
Ok(activity) => {
|
||||||
let context = InboxContext { tx: Mutex::new(tx), cx };
|
puppy::ingest(&cx, receiver, &activity).await.unwrap();
|
||||||
let r = puppy::systems::processor::process_incoming(&context, &key, activity).await;
|
match puppy::interpret(&cx, activity) {
|
||||||
match r {
|
|
||||||
Ok(_) => Ok(respond!(code: 202)),
|
Ok(_) => Ok(respond!(code: 202)),
|
||||||
Err(err) => fuck!(400: "error interpreting activity: {err}"),
|
Err(err) => fuck!(400: "error interpreting activity: {err}"),
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ mod api;
|
||||||
async fn main() {
|
async fn main() {
|
||||||
Registry::default()
|
Registry::default()
|
||||||
.with(filter_fn(|meta| !meta.target().starts_with("reqwest")))
|
.with(filter_fn(|meta| !meta.target().starts_with("reqwest")))
|
||||||
//.with(filter_fn(|meta| *meta.level() < Level::DEBUG))
|
.with(filter_fn(|meta| *meta.level() < Level::DEBUG))
|
||||||
.with(ForestLayer::default())
|
.with(ForestLayer::default())
|
||||||
.init();
|
.init();
|
||||||
// TODO: load the config from a file or something.
|
// TODO: load the config from a file or something.
|
||||||
|
|
|
@ -38,13 +38,13 @@ impl Client {
|
||||||
///
|
///
|
||||||
/// Note that in order for the request to be considered valid by most implementations, `key.owner`
|
/// Note that in order for the request to be considered valid by most implementations, `key.owner`
|
||||||
/// must equal `payload.actor`.
|
/// must equal `payload.actor`.
|
||||||
#[instrument(target = "fetch.delivery", skip_all, fields(activity = payload.id, url = inbox.as_ref(), key = key.id))]
|
#[instrument(target = "fetch.delivery", skip_all, fields(activity = payload.id, url = inbox, key = key.id))]
|
||||||
pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: impl AsRef<str>) {
|
pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: &str) {
|
||||||
let system = Subsystem::Delivery;
|
let system = Subsystem::Delivery;
|
||||||
|
|
||||||
let body = serde_json::to_string(&payload.to_json_ld()).unwrap();
|
let body = serde_json::to_string(&payload.to_json_ld()).unwrap();
|
||||||
let mut req = system
|
let mut req = system
|
||||||
.new_request(inbox.as_ref())
|
.new_request(inbox)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.method(Method::POST)
|
.method(Method::POST)
|
||||||
.header("content-type", ACTIVITYPUB_TYPE)
|
.header("content-type", ACTIVITYPUB_TYPE)
|
||||||
|
|
|
@ -14,4 +14,3 @@ either = "*"
|
||||||
derive_more = "*"
|
derive_more = "*"
|
||||||
serde_json = "*"
|
serde_json = "*"
|
||||||
tracing = "*"
|
tracing = "*"
|
||||||
futures = "*"
|
|
||||||
|
|
|
@ -90,14 +90,6 @@ pub struct FollowRequest {
|
||||||
pub target: Key,
|
pub target: Key,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
|
|
||||||
pub struct Accept {
|
|
||||||
#[identity]
|
|
||||||
pub id: Key,
|
|
||||||
pub origin: Key,
|
|
||||||
pub target: Key,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The status of a [`FollowRequest`].
|
/// The status of a [`FollowRequest`].
|
||||||
///
|
///
|
||||||
/// Valid state transitions:
|
/// Valid state transitions:
|
||||||
|
@ -220,5 +212,4 @@ pub fn schema() -> Schema {
|
||||||
.has::<AuthorOf>()
|
.has::<AuthorOf>()
|
||||||
.has::<Follows>()
|
.has::<Follows>()
|
||||||
.has::<Create>()
|
.has::<Create>()
|
||||||
.has::<Accept>()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,13 +2,7 @@
|
||||||
//! you should take a look at [`fetch`].
|
//! you should take a look at [`fetch`].
|
||||||
|
|
||||||
// Working with result types is such a bitch without these.
|
// Working with result types is such a bitch without these.
|
||||||
#![feature(
|
#![feature(iterator_try_collect, try_blocks, once_cell_try, box_into_inner)]
|
||||||
iterator_try_collect,
|
|
||||||
try_blocks,
|
|
||||||
once_cell_try,
|
|
||||||
box_into_inner,
|
|
||||||
type_changing_struct_update
|
|
||||||
)]
|
|
||||||
|
|
||||||
use std::hint::unreachable_unchecked;
|
use std::hint::unreachable_unchecked;
|
||||||
|
|
||||||
|
@ -342,7 +336,7 @@ pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()>
|
||||||
match object {
|
match object {
|
||||||
Object::Activity(a) => interpret(&cx, a)?,
|
Object::Activity(a) => interpret(&cx, a)?,
|
||||||
Object::Actor(a) => cx.run(|tx| actor::create_remote(tx, a).map(void))?,
|
Object::Actor(a) => cx.run(|tx| actor::create_remote(tx, a).map(void))?,
|
||||||
Object::Note(a) => cx.run(|tx| post::create_post_from_note(tx, a).map(void))?,
|
Object::Note(a) => post::create_post_from_note(cx, a).map(void)?,
|
||||||
_ => todo!(),
|
_ => todo!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -351,630 +345,3 @@ pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()>
|
||||||
|
|
||||||
/// Discard the argument.
|
/// Discard the argument.
|
||||||
fn void<T>(_: T) -> () {}
|
fn void<T>(_: T) -> () {}
|
||||||
|
|
||||||
pub mod systems {
|
|
||||||
//! Logic containment zone.
|
|
||||||
|
|
||||||
use std::sync::MutexGuard;
|
|
||||||
|
|
||||||
use fetch::Client;
|
|
||||||
use store::{Key, Transaction};
|
|
||||||
use tracing::warn;
|
|
||||||
|
|
||||||
/// Allows subsystems to interact with the [fetch] and [store] components.
|
|
||||||
pub trait Context<'a> {
|
|
||||||
/// Access the transaction.
|
|
||||||
fn db<'y: 'x, 'x>(&'x self) -> MutexGuard<'x, Transaction<'y>>
|
|
||||||
where
|
|
||||||
'a: 'y,
|
|
||||||
'y: 'a;
|
|
||||||
/// Access the federation client.
|
|
||||||
fn client(&self) -> &Client;
|
|
||||||
/// Get the ActivityPub domain of this server.
|
|
||||||
fn domain(&self) -> &str;
|
|
||||||
/// Format `key` as an object ID.
|
|
||||||
fn make_object_id(&self, key: Key) -> String {
|
|
||||||
format!("https://{}/o/{key}", self.domain())
|
|
||||||
}
|
|
||||||
/// Check whether we already know about an object.
|
|
||||||
fn is_known(&self, url: &str) -> bool {
|
|
||||||
use crate::data::Id;
|
|
||||||
|
|
||||||
match self.db().lookup(Id(url.to_string())) {
|
|
||||||
Ok(Some(_)) => true,
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Check whether `url` refers to an object that is considered "local" to this server.
|
|
||||||
fn is_local(&self, url: &str) -> bool {
|
|
||||||
use crate::data::{Id, Object};
|
|
||||||
|
|
||||||
let Ok(Some(key)) = self.db().lookup(Id(url.to_string())) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.db().get_mixin(key) {
|
|
||||||
Ok(Some(Object { local, .. })) => local,
|
|
||||||
Ok(None) => false,
|
|
||||||
Err(err) => {
|
|
||||||
warn!("error while trying to determine origin of {key}: {err}");
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod notification {
|
|
||||||
//! Manages the delivery of notifications to local users.
|
|
||||||
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
use crate::Result;
|
|
||||||
use super::Context;
|
|
||||||
|
|
||||||
/// Something that can be sent to a local user as a (push) notification.
|
|
||||||
pub trait Notification {}
|
|
||||||
|
|
||||||
/// Get the notification where it needs to go.
|
|
||||||
pub fn dispatch<'x, C>(cx: &C, event: &impl Notification) -> Result<()>
|
|
||||||
where
|
|
||||||
C: Context<'x>,
|
|
||||||
{
|
|
||||||
info!("ding!");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod delivery {
|
|
||||||
//! A mid-level system for delivering messages to remote servers.
|
|
||||||
|
|
||||||
use fetch::{
|
|
||||||
object::{Activity, Object},
|
|
||||||
signatures::SigningKey,
|
|
||||||
};
|
|
||||||
use futures::{stream::FuturesUnordered, Future, StreamExt as _};
|
|
||||||
use derive_more::Display;
|
|
||||||
use store::{arrow::Multi, Key};
|
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use crate::{entities::Accept, Error, Result};
|
|
||||||
|
|
||||||
use super::Context;
|
|
||||||
|
|
||||||
/// Type tag for an activity.
|
|
||||||
#[derive(Clone, Copy, Display)]
|
|
||||||
pub enum Tag {
|
|
||||||
Follow,
|
|
||||||
Accept,
|
|
||||||
Reject,
|
|
||||||
Create,
|
|
||||||
Delete,
|
|
||||||
Bite,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Implemented for types that can represent an activity that can be delivered to a remote server.
|
|
||||||
pub trait Payload {
|
|
||||||
/// Construct an [`Activity`] for delivery.
|
|
||||||
fn prepare<'c, C>(self, cx: &C) -> Result<Activity>
|
|
||||||
where
|
|
||||||
C: Context<'c>;
|
|
||||||
/// Call the delivery subsystem to this payload everywhere it needs to go.
|
|
||||||
///
|
|
||||||
/// A convenience method to call [`deliver`] as a method instead of a free-standing function.
|
|
||||||
fn deliver<'c, C>(self, cx: &C) -> impl Future<Output = Result<()>> + Send + Sync
|
|
||||||
where
|
|
||||||
Self: Sized + Send + Sync,
|
|
||||||
C: Context<'c> + Send + Sync,
|
|
||||||
{
|
|
||||||
deliver(cx, self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Payload for Accept {
|
|
||||||
fn prepare<'c, C>(self, cx: &C) -> Result<Activity>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
let (actor, object, tag) = get_activity_data(cx, self.into())?;
|
|
||||||
Ok(Activity {
|
|
||||||
id: cx.make_object_id(self.into()),
|
|
||||||
actor,
|
|
||||||
object: Box::new(Object::Id { id: object }),
|
|
||||||
kind: tag.to_string(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Deliver the activity-like payload to all its intended recipients.
|
|
||||||
pub async fn deliver<'c, C>(cx: &C, payload: impl Payload) -> Result<()>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
let activity = payload.prepare(cx)?;
|
|
||||||
|
|
||||||
if !cx.is_local(&activity.id) {
|
|
||||||
error!("delivery of non-local activity!!");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let signing_key = get_keypair(cx, &activity.actor)?;
|
|
||||||
get_targets(cx, &activity)?
|
|
||||||
.into_iter()
|
|
||||||
.map(|inbox| cx.client().deliver(&signing_key, &activity, inbox))
|
|
||||||
.collect::<FuturesUnordered<_>>()
|
|
||||||
.collect::<()>()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Calculate the list of inboxes to send the activity to.
|
|
||||||
fn get_targets<'c, C>(
|
|
||||||
cx: &C,
|
|
||||||
activity: &Activity,
|
|
||||||
) -> Result<impl IntoIterator<Item = String>>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
use crate::data::{Channel, Id};
|
|
||||||
|
|
||||||
let get_inbox = |id: &str| -> Result<Option<String>> {
|
|
||||||
let Some(key) = cx.db().lookup(Id(id.to_owned()))? else {
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
cx.db()
|
|
||||||
.get_mixin(key)
|
|
||||||
.map(|o| o.map(|Channel { inbox }| inbox))
|
|
||||||
.map_err(Error::Store)
|
|
||||||
};
|
|
||||||
|
|
||||||
match *activity.object {
|
|
||||||
Object::Id { ref id } => get_inbox(&id),
|
|
||||||
Object::Activity(ref a) => get_inbox(&a.actor),
|
|
||||||
Object::Actor(ref a) => Ok(Some(a.inbox.clone())),
|
|
||||||
Object::Note(ref a) => get_inbox(&a.author),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get a signing key for the given actor id.
|
|
||||||
///
|
|
||||||
/// Will fail if the actor isn't local.
|
|
||||||
fn get_keypair<'c, C>(cx: &C, actor_id: &str) -> Result<SigningKey>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
use crate::{
|
|
||||||
actor::{get_signing_key, Actor},
|
|
||||||
data::Id,
|
|
||||||
};
|
|
||||||
let Some(actor_key) = cx.db().lookup(Id(actor_id.to_string()))? else {
|
|
||||||
panic!("could not get db key for {actor_id}");
|
|
||||||
};
|
|
||||||
get_signing_key(&*cx.db(), Actor { key: actor_key }).map_err(Error::Store)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get actor id, the object id, and the type of the activity.
|
|
||||||
fn get_activity_data<'c, C>(cx: &C, key: Key) -> Result<(String, String, Tag)>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
// TODO: instead of panicking, return a normal error.
|
|
||||||
use crate::data::{Id, Object, ActivityKind, ObjectKind};
|
|
||||||
|
|
||||||
let Some(Object { kind, .. }) = cx.db().get_mixin(key)? else {
|
|
||||||
panic!("activity must be an object")
|
|
||||||
};
|
|
||||||
let Some(Multi { origin, target, .. }) = cx.db().get_arrow_raw(key)? else {
|
|
||||||
panic!("expected activity to have multi-arrow repr")
|
|
||||||
};
|
|
||||||
let Some(Id(object)) = cx.db().get_alias(target)? else {
|
|
||||||
panic!("expected object of activity to have an id")
|
|
||||||
};
|
|
||||||
let Some(Id(actor)) = cx.db().get_alias(origin)? else {
|
|
||||||
panic!("expected author of activity to have an id")
|
|
||||||
};
|
|
||||||
|
|
||||||
let tag = match kind {
|
|
||||||
ObjectKind::Activity(tag) => match tag {
|
|
||||||
ActivityKind::Create => Tag::Create,
|
|
||||||
ActivityKind::Follow => Tag::Follow,
|
|
||||||
ActivityKind::Accept => Tag::Accept,
|
|
||||||
ActivityKind::Reject => Tag::Reject,
|
|
||||||
ActivityKind::Bite => Tag::Bite,
|
|
||||||
},
|
|
||||||
_ => panic!("invalid kind for activity"),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((actor, object, tag))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod ingestion {
|
|
||||||
//! Processing remote objects to make them part of our local graph.
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod processor {
|
|
||||||
//! The structure and interpretation of activity graphs.
|
|
||||||
//!
|
|
||||||
//! This is where the magic happens.
|
|
||||||
//!
|
|
||||||
//! This module contains the code that assembles and executes a graph of activities. This is a higher
|
|
||||||
//! level system, that depends on the following sibling systems:
|
|
||||||
//!
|
|
||||||
//! - [`following`][super::following]
|
|
||||||
//! - [`delivery`][super::delivery]
|
|
||||||
//!
|
|
||||||
//! Its main purpose is to integrate all the above system in the context of receiving an activity in
|
|
||||||
//! an inbox, and the subsequent processing that needs to happen. In order to *correctly* interpret an
|
|
||||||
//! activity, we need to know about the other nodes that it references.
|
|
||||||
|
|
||||||
use fetch::{
|
|
||||||
object::{Activity, Actor, Note, Object},
|
|
||||||
signatures::SigningKey,
|
|
||||||
FetchError,
|
|
||||||
};
|
|
||||||
use store::Key;
|
|
||||||
use tracing::{debug, info, trace, warn};
|
|
||||||
|
|
||||||
use crate::{data, Result};
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
delivery::{self, Tag},
|
|
||||||
following, Context,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Process an incoming activity.
|
|
||||||
#[tracing::instrument(target = "puppy.processor", skip_all, fields(activity = root.id))]
|
|
||||||
pub async fn process_incoming<'c, C>(
|
|
||||||
cx: &C,
|
|
||||||
on_behalf_of: &SigningKey,
|
|
||||||
root: Activity,
|
|
||||||
) -> Result<(Key, Tag)>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
if cx.is_local(&root.id) || cx.is_known(&root.id) {
|
|
||||||
panic!("could not process activity, it already exists");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch all transitive dependencies of `root`, returned in the order that they need to be processed.
|
|
||||||
let (actors, notes, activities) =
|
|
||||||
fetch_dependencies(cx, &root, &on_behalf_of, 3).await?;
|
|
||||||
|
|
||||||
trace!("storing actors");
|
|
||||||
for actor in actors {
|
|
||||||
store_actor(cx, actor)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("storing notes");
|
|
||||||
for note in notes {
|
|
||||||
store_note(cx, note)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process the activities that have to execute before `activity`. These dependencies are returned
|
|
||||||
// in the exact order that they need to be applied. `fetch_dependencies` only returns stuff that is
|
|
||||||
// missing from the graph.
|
|
||||||
trace!("applying dependency activities");
|
|
||||||
for dep in activities {
|
|
||||||
let (key, tag) = apply_activity(cx, dep).await?;
|
|
||||||
debug!(
|
|
||||||
dependency_of = root.id,
|
|
||||||
kind = tag.to_string(),
|
|
||||||
"processed activity {key}"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now that all activities that this depends on have been executed, we can execute the original
|
|
||||||
// one.
|
|
||||||
trace!("executing target activity");
|
|
||||||
apply_activity(cx, root).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute the activity.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// This function assumes that everything the activity references by ActivityPub ID is already present in the context.
|
|
||||||
/// If this is not the case, it will panic.
|
|
||||||
///
|
|
||||||
/// Specifically,
|
|
||||||
#[tracing::instrument(level = "TRACE", target = "puppy.processor", skip_all, fields(activity = activity.id, tag = activity.kind))]
|
|
||||||
async fn apply_activity<'c, C>(cx: &C, activity: Activity) -> Result<(Key, Tag)>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
use crate::data::{Id, Create, Object, Accept};
|
|
||||||
// Get a key and error out if it does not exist
|
|
||||||
let get_key = |url: &str| -> Key {
|
|
||||||
cx.db()
|
|
||||||
.lookup(data::Id(url.to_string()))
|
|
||||||
.expect("database should be operable")
|
|
||||||
.expect("url should already have been inserted")
|
|
||||||
};
|
|
||||||
|
|
||||||
let tag = tagof(&activity);
|
|
||||||
let key = match tag {
|
|
||||||
Tag::Follow => {
|
|
||||||
let requester = get_key(&activity.actor);
|
|
||||||
let target = get_key(activity.object.id());
|
|
||||||
|
|
||||||
let req = following::create_follow_request(cx, requester, target)?;
|
|
||||||
|
|
||||||
// For now, automatically accept follow requests.
|
|
||||||
if cx.is_local(activity.object.id()) {
|
|
||||||
debug!("auto-accepting follow request for local actor {target}",);
|
|
||||||
let accept = following::accept_follow_request(cx, req)?;
|
|
||||||
// TODO: Add object components to accept activity before sending
|
|
||||||
if !cx.is_local(&activity.actor) {
|
|
||||||
debug!("delivering to remote actor");
|
|
||||||
delivery::deliver(cx, accept).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
req.into()
|
|
||||||
}
|
|
||||||
Tag::Accept => {
|
|
||||||
let req_id = get_key(activity.object.id());
|
|
||||||
following::accept_follow_request(cx, req_id.into())?.into()
|
|
||||||
}
|
|
||||||
Tag::Reject => {
|
|
||||||
let req_id = get_key(activity.object.id());
|
|
||||||
following::reject_follow_request(cx, req_id.into())?.into()
|
|
||||||
}
|
|
||||||
Tag::Create => {
|
|
||||||
let id = Key::gen();
|
|
||||||
let actor = get_key(&activity.actor);
|
|
||||||
let object = get_key(activity.object.id());
|
|
||||||
cx.db().add_alias(id, Id(cx.make_object_id(id)))?;
|
|
||||||
cx.db().create(Create { id, actor, object })?;
|
|
||||||
id.into()
|
|
||||||
}
|
|
||||||
Tag::Delete => todo!(),
|
|
||||||
Tag::Bite => todo!(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((key, tag))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetch all transitive dependencies of `root` using the given `auth` as the signing authority.
|
|
||||||
///
|
|
||||||
/// The `budget` parameter specifies the maximum number of recursive calls.
|
|
||||||
#[tracing::instrument(target = "puppy.processor", skip_all, fields(budget = budget, target = root.id))]
|
|
||||||
async fn fetch_dependencies<'c, C>(
|
|
||||||
cx: &C,
|
|
||||||
root: &Activity,
|
|
||||||
auth: &SigningKey,
|
|
||||||
budget: usize,
|
|
||||||
) -> Result<(Vec<Actor>, Vec<Note>, Vec<Activity>)>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
let mut actors = Vec::new();
|
|
||||||
let mut notes = Vec::new();
|
|
||||||
let mut activities = Vec::new();
|
|
||||||
|
|
||||||
for url in [root.actor.as_str(), root.object.id()] {
|
|
||||||
if cx.is_known(url) {
|
|
||||||
debug!(parent = root.id, url, "already known, skipping");
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
debug!(parent = root.id, url, budget, "fetching dependency");
|
|
||||||
}
|
|
||||||
|
|
||||||
let json = cx.client().resolve(auth, url).await?;
|
|
||||||
let object = Object::from_json(json).map_err(FetchError::BadJson)?;
|
|
||||||
|
|
||||||
match object {
|
|
||||||
Object::Id { id } => {
|
|
||||||
warn!(parent = root.id, url, "could not fetch {id}");
|
|
||||||
}
|
|
||||||
Object::Activity(a) if budget == 0 => {
|
|
||||||
info!(parent = root.id, url, "exceeded budget, skipping");
|
|
||||||
activities.push(a);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Object::Activity(a) => {
|
|
||||||
debug!(parent = root.id, url, "fetching dependencies");
|
|
||||||
// BUG: this won't hit cache because none of the stuff is in there yet.
|
|
||||||
let (x, y, z) =
|
|
||||||
Box::pin(fetch_dependencies(cx, &a, auth, budget - 1)).await?;
|
|
||||||
trace!(
|
|
||||||
parent = root.id,
|
|
||||||
url,
|
|
||||||
"fetch completed, total: {}",
|
|
||||||
x.len() + y.len() + z.len()
|
|
||||||
);
|
|
||||||
actors.extend(x);
|
|
||||||
notes.extend(y);
|
|
||||||
activities.extend(z);
|
|
||||||
activities.push(a);
|
|
||||||
}
|
|
||||||
Object::Actor(a) => actors.push(a),
|
|
||||||
Object::Note(a) => notes.push(a),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((actors, notes, activities))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn store_actor<'c, C>(cx: &C, actor: Actor) -> Result<()>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
if !cx.is_known(&actor.id) {
|
|
||||||
crate::actor::create_remote(&*cx.db(), actor)?;
|
|
||||||
} else {
|
|
||||||
debug!("actor {} is already known", actor.id);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn store_note<'c, C>(cx: &C, note: Note) -> Result<()>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
if !cx.is_known(¬e.id) {
|
|
||||||
crate::post::create_post_from_note(&*cx.db(), note)?;
|
|
||||||
} else {
|
|
||||||
debug!("note {} is already known", note.id);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the tag of the activity.
|
|
||||||
fn tagof(a: &Activity) -> Tag {
|
|
||||||
match a.kind.as_str() {
|
|
||||||
"Create" => Tag::Create,
|
|
||||||
"Delete" => Tag::Delete,
|
|
||||||
"Follow" => Tag::Follow,
|
|
||||||
"Accept" => Tag::Accept,
|
|
||||||
"Reject" => Tag::Reject,
|
|
||||||
"Bite" => Tag::Bite,
|
|
||||||
_ => todo!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod timelines {
|
|
||||||
//! Manages the rendering of timelines.
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod following {
|
|
||||||
//! Follow requests and management thereof.
|
|
||||||
|
|
||||||
use store::Key;
|
|
||||||
|
|
||||||
use super::Context;
|
|
||||||
use crate::{
|
|
||||||
entities::{Accept, FollowRequest, Reject, Undo},
|
|
||||||
Result,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Create a follow request.
|
|
||||||
pub fn create_follow_request<'c, C>(
|
|
||||||
cx: &C,
|
|
||||||
follower: Key,
|
|
||||||
target: Key,
|
|
||||||
) -> Result<FollowRequest>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
Ok(Key::gen().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Cancel a follow request.
|
|
||||||
///
|
|
||||||
/// If the follow request was already accepted, the follow request's target is unfollowed by the actor. Otherwise,
|
|
||||||
/// the follow request is withdrawn.
|
|
||||||
///
|
|
||||||
/// This creates a new [`Undo`] entry in the database to which data may be attached.
|
|
||||||
pub fn cancel_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Undo>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
Ok(Key::gen().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Apply the changes related to accepting a follow request to the social graph and create a new node representing
|
|
||||||
/// the event.
|
|
||||||
pub fn accept_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Accept>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
Ok(Key::gen().into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Apply the changes related to rejecting a follow request to the social graph and create a new node representing
|
|
||||||
/// the event.
|
|
||||||
pub fn reject_follow_request<'c, C>(cx: &C, req: FollowRequest) -> Result<Reject>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
Ok(Key::gen().into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod reverse {
|
|
||||||
//! Undoing operations.
|
|
||||||
//!
|
|
||||||
//! This module defines the behavior of the [`Undo`] activity.
|
|
||||||
|
|
||||||
use crate::entities::{FollowRequest, Undo};
|
|
||||||
|
|
||||||
use super::{following, Context};
|
|
||||||
|
|
||||||
/// Describes objects which have a "revert" operation defined (that is, they can be the target of an [`Undo`] activity).
|
|
||||||
pub trait Reversible {
|
|
||||||
/// Undo `self` and generate a corresponding [`Undo`] object recording this fact.
|
|
||||||
fn revert<'c, C>(&self, cx: &C) -> crate::Result<Undo>
|
|
||||||
where
|
|
||||||
C: Context<'c>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Reversible for FollowRequest {
|
|
||||||
/// Withdraw a follow request if it wasn't yet accepted, or unfollow someone.
|
|
||||||
fn revert<'c, C>(&self, cx: &C) -> crate::Result<Undo>
|
|
||||||
where
|
|
||||||
C: Context<'c>,
|
|
||||||
{
|
|
||||||
following::cancel_follow_request(cx, *self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod entities {
|
|
||||||
//! Virtual data types composed from [components] and operated on by [systems].
|
|
||||||
//!
|
|
||||||
//! [components]: crate::components
|
|
||||||
//! [systems]: crate::systems
|
|
||||||
|
|
||||||
use store::Key;
|
|
||||||
use derive_more::{From, Into, Display};
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Poster(Key);
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Server(Key);
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Post(Key);
|
|
||||||
|
|
||||||
/// Represents a `Bite` activity.
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Bite(Key);
|
|
||||||
|
|
||||||
/// Represents an `Undo` activity.
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Undo(Key);
|
|
||||||
|
|
||||||
/// Represents an `Accept` activity.
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Accept(Key);
|
|
||||||
|
|
||||||
/// Represents a `Reject` activity.
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Reject(Key);
|
|
||||||
|
|
||||||
/// Represents a `Create` activity.
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct Create(Key);
|
|
||||||
|
|
||||||
/// Represents a `Follow` activity.
|
|
||||||
///
|
|
||||||
/// Also see the [`following`][crate::systems::following] module, which defines the logic for following, follow requests
|
|
||||||
/// and other related stuff.
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct FollowRequest(Key);
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, From, Into, Display)]
|
|
||||||
pub struct PublicKey(Key);
|
|
||||||
|
|
||||||
/// A key newtype that represents an object.
|
|
||||||
pub trait Entity: Into<Key> + From<Key> + Copy {}
|
|
||||||
|
|
||||||
impl<T> Entity for T where T: Into<Key> + From<Key> + Copy {}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod components {}
|
|
||||||
|
|
|
@ -152,27 +152,29 @@ pub fn create_local_post(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Assumes all objects referenced already exist.
|
/// Assumes all objects referenced already exist.
|
||||||
#[tracing::instrument(skip(tx))]
|
#[tracing::instrument(skip(cx))]
|
||||||
pub fn create_post_from_note(tx: &Transaction<'_>, note: Note) -> crate::Result<Post> {
|
pub fn create_post_from_note(cx: &Context, note: Note) -> crate::Result<Post> {
|
||||||
let Some(author) = tx.lookup(Id(note.author))? else {
|
cx.run(|tx| {
|
||||||
panic!("needed author to already exist")
|
let Some(author) = tx.lookup(Id(note.author))? else {
|
||||||
};
|
panic!("needed author to already exist")
|
||||||
|
};
|
||||||
|
|
||||||
let key = Key::gen();
|
let key = Key::gen();
|
||||||
|
|
||||||
tx.add_alias(key, Id(note.id.clone()))?;
|
tx.add_alias(key, Id(note.id.clone()))?;
|
||||||
tx.create(AuthorOf { object: key, author })?;
|
tx.create(AuthorOf { object: key, author })?;
|
||||||
tx.add_mixin(key, Content {
|
tx.add_mixin(key, Content {
|
||||||
content: note.content,
|
content: note.content,
|
||||||
warning: note.summary,
|
warning: note.summary,
|
||||||
})?;
|
})?;
|
||||||
tx.add_mixin(key, data::Object {
|
tx.add_mixin(key, data::Object {
|
||||||
kind: ObjectKind::Notelike(note.kind),
|
kind: ObjectKind::Notelike(note.kind),
|
||||||
id: Id(note.id),
|
id: Id(note.id),
|
||||||
local: false,
|
local: false,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(Post { key })
|
Ok(Post { key })
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(cx))]
|
#[tracing::instrument(skip(cx))]
|
||||||
|
|
|
@ -222,15 +222,11 @@ impl Transaction<'_> {
|
||||||
where
|
where
|
||||||
A: Arrow<Kind = Multi>,
|
A: Arrow<Kind = Multi>,
|
||||||
{
|
{
|
||||||
Ok(self.get_arrow_raw(key)?.map(A::from))
|
|
||||||
}
|
|
||||||
/// Construct the arrow from its identifier.
|
|
||||||
pub fn get_arrow_raw(&self, key: Key) -> Result<Option<Multi>> {
|
|
||||||
let arrow = self
|
let arrow = self
|
||||||
.open(crate::types::MULTIEDGE_HEADERS)
|
.open(crate::types::MULTIEDGE_HEADERS)
|
||||||
.get(key)?
|
.get(key)?
|
||||||
.map(|v| Key::split(v.as_ref()))
|
.map(|v| Key::split(v.as_ref()))
|
||||||
.map(|(origin, target)| Multi { origin, target, identity: key });
|
.map(|(origin, target)| A::from(Multi { origin, target, identity: key }));
|
||||||
Ok(arrow)
|
Ok(arrow)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,14 +3,13 @@ use std::{
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
};
|
};
|
||||||
|
|
||||||
use bincode::{Decode, Encode};
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use ulid::Ulid;
|
use ulid::Ulid;
|
||||||
|
|
||||||
use crate::StoreError;
|
use crate::StoreError;
|
||||||
|
|
||||||
/// A unique identifier for vertices in the database.
|
/// A unique identifier for vertices in the database.
|
||||||
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
|
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||||
pub struct Key(pub(crate) [u8; 16]);
|
pub struct Key(pub(crate) [u8; 16]);
|
||||||
|
|
||||||
impl Key {
|
impl Key {
|
||||||
|
|
|
@ -68,7 +68,7 @@ pub struct Batch {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
/// Run a [transaction][Transaction] and ensure that it is either committed or rolled back.
|
/// Run a [transaction][Transaction].
|
||||||
///
|
///
|
||||||
/// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not
|
/// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not
|
||||||
/// recorded. Changes made inside a transaction can be read from within that transaction before they are
|
/// recorded. Changes made inside a transaction can be read from within that transaction before they are
|
||||||
|
@ -85,20 +85,15 @@ impl Store {
|
||||||
store: &self,
|
store: &self,
|
||||||
};
|
};
|
||||||
let r = f(&tx);
|
let r = f(&tx);
|
||||||
if r.is_err() {
|
if let Err(e) = if r.is_err() {
|
||||||
tx.cancel()?;
|
tx.inner.rollback()
|
||||||
} else {
|
} else {
|
||||||
tx.commit()?;
|
tx.inner.commit()
|
||||||
|
} {
|
||||||
|
return Err(E::from(StoreError::Internal(e)));
|
||||||
}
|
}
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
/// Begin a transaction.
|
|
||||||
pub fn start(&self) -> Transaction<'_> {
|
|
||||||
Transaction {
|
|
||||||
inner: self.inner.transaction(),
|
|
||||||
store: &self,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Apply a batch of changes atomically.
|
/// Apply a batch of changes atomically.
|
||||||
pub fn apply(&self, batch: Batch) -> Result<()> {
|
pub fn apply(&self, batch: Batch) -> Result<()> {
|
||||||
self.inner.write(batch.inner.into_inner())?;
|
self.inner.write(batch.inner.into_inner())?;
|
||||||
|
@ -137,17 +132,6 @@ impl Store {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'db> Transaction<'db> {
|
|
||||||
/// Complete the transaction successfully.
|
|
||||||
pub fn commit(self) -> Result<(), StoreError> {
|
|
||||||
self.inner.commit().map_err(StoreError::from)
|
|
||||||
}
|
|
||||||
/// Cancel the transaction.
|
|
||||||
pub fn cancel(self) -> Result<(), StoreError> {
|
|
||||||
self.inner.rollback().map_err(StoreError::from)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
|
/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
|
||||||
pub const OK: Result<()> = Ok(());
|
pub const OK: Result<()> = Ok(());
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue