Add tracing

This commit is contained in:
Riley Apeldoorn 2024-05-04 13:50:04 +02:00
parent 8f2ea89301
commit 3d0a05f3a9
14 changed files with 263 additions and 99 deletions

124
Cargo.lock generated
View file

@ -110,12 +110,6 @@ dependencies = [
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.0"
@ -480,7 +474,7 @@ checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984"
name = "fetch"
version = "0.0.0"
dependencies = [
"base64 0.22.0",
"base64",
"chrono",
"derive_more",
"http",
@ -490,8 +484,8 @@ dependencies = [
"reqwest",
"rsa",
"serde_json",
"sigh",
"spki",
"tracing",
]
[[package]]
@ -991,6 +985,16 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
@ -1107,6 +1111,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1136,7 +1146,7 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e459365e590736a54c3fa561947c84837534b8e9af6fc5bf781307e82658fae"
dependencies = [
"base64 0.22.0",
"base64",
"serde",
]
@ -1250,6 +1260,7 @@ dependencies = [
"fetch",
"serde_json",
"store",
"tracing",
]
[[package]]
@ -1335,7 +1346,7 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e6cc1e89e689536eb5aeede61520e874df5a4707df811cd5da4aa5fbb2aae19"
dependencies = [
"base64 0.22.0",
"base64",
"bytes",
"encoding_rs",
"futures-core",
@ -1441,7 +1452,7 @@ version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d"
dependencies = [
"base64 0.22.0",
"base64",
"rustls-pki-types",
]
@ -1556,6 +1567,9 @@ dependencies = [
"puppy",
"serde_json",
"tokio",
"tracing",
"tracing-forest",
"tracing-subscriber",
]
[[package]]
@ -1569,25 +1583,21 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "sigh"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46bdb4cc44c46a3f0f0a6d1de27c63fccd7fa3384d8d370016c21c8f4a8b89a2"
dependencies = [
"base64 0.21.7",
"http",
"nom",
"openssl",
"thiserror",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@ -1764,6 +1774,16 @@ dependencies = [
"syn 2.0.60",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -1869,9 +1889,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]]
name = "tracing-core"
version = "0.1.32"
@ -1879,6 +1911,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-forest"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee40835db14ddd1e3ba414292272eddde9dad04d3d4b65509656414d1c42592f"
dependencies = [
"smallvec",
"thiserror",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]
[[package]]
@ -1948,6 +2018,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"

View file

@ -5,6 +5,7 @@ use puppy::{
actor::Actor,
config::Config,
data::{FollowRequest, Object, Profile},
post::Author,
Context,
};
@ -29,7 +30,7 @@ async fn main() -> puppy::Result<()> {
println!("- @{account_name} ({origin}) (request url = {id})");
}
Ok(())
})
})?;
// let post = puppy::post::create_post(&cx, riley.key, "i like boys")?;
// puppy::post::federate_post(&cx, post).await
@ -62,12 +63,13 @@ async fn main() -> puppy::Result<()> {
// })?;
// }
// println!("\nPosts on the instance:");
// for post in puppy::post::fetch_timeline(&db, .., None)?.posts() {
// let Author { ref handle, .. } = post.author;
// let content = post.content.content.as_ref().unwrap();
// println!("- {:?} by {handle}:\n{content}", post.id)
// }
println!("\nPosts on the instance:");
for post in puppy::post::fetch_timeline(cx.store(), .., None)?.posts() {
let Author { ref handle, .. } = post.author;
let content = post.content.content.as_ref().unwrap();
println!("- {:?} by {handle}:\n{content}", post.id)
}
Ok(())
// cx.run(|tx| {
// println!("\nLinen's followers:");

View file

@ -11,3 +11,6 @@ hyper-util = { version = "*", features = ["full"] }
serde_json = "*"
http = "*"
derive_more = "*"
tracing = "*"
tracing-subscriber = "*"
tracing-forest = "*"

View file

@ -1,6 +1,6 @@
//! API endpoints and request handlers.
use std::{convert::Infallible, future::Future};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
@ -11,8 +11,9 @@ use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper::Method;
use puppy::Context;
use serde_json::{from_slice, json, Value};
use serde_json::{json, Value};
use tokio::net::TcpListener;
use tracing::{info, info_span, trace_span, Instrument as _};
use crate::sig::{Signer, Verdict, Verifier, VERIFIER_MOUNT};
@ -90,10 +91,11 @@ pub async fn start(context: Context) -> Result<(), Box<dyn std::error::Error + S
let verifier = verifier.clone();
tokio::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(|req| handle(req, &verifier, cx.clone())))
.await
{
let service = service_fn(|req| {
let span = info_span!("request");
handle(req, &verifier, cx.clone()).instrument(span)
});
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
eprintln!("Error serving connection: {:?}", err);
}
});
@ -164,7 +166,16 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result<Respon
};
// Simplified representation of a request, so we can pattern match on it more easily in the dispatchers.
let req = Req::simplify(&request);
println!("{} {}", request.method(), request.uri());
let user_agent = request
.headers()
.get("user-agent")
.and_then(|h| h.to_str().ok());
info! {
target = format!("{} {}", request.method(), request.uri()),
user_agent,
"incoming"
};
// We'll use the path to pick where specifically to send the request.
// Check request signature at the door. Even if it isn't needed for a particular endpoint, failing fast
// with a clear error message will save anyone trying to get *their* signatures implementation a major
@ -186,16 +197,8 @@ async fn handle(req: Request, verifier: &Verifier, cx: Context) -> Result<Respon
};
// If one of the endpoints gave us an error message, we convert that into a response and then
// serve it to the client. In either case, we just serve a response.
let response = res.unwrap_or_else(|msg| {
println!("{} {}: [error] {msg}", request.method(), request.uri());
req.error(msg)
});
println! {
"{} {}: {}",
request.method(),
request.uri(),
response.status()
};
let response = res.unwrap_or_else(|msg| req.error(msg));
info!(status = response.status().as_str(), "finished");
Ok(response)
}
@ -207,6 +210,7 @@ const GET: &Method = &Method::GET;
/// This function is where all requests to a protected endpoint have to go through. If the request
/// was signed but does not target a protected endpoint, this function will fall back to the
/// [`dispatch_public`] handler.
#[tracing::instrument(skip_all)]
async fn dispatch_signed(
cx: Context,
verifier: &Verifier,
@ -219,9 +223,7 @@ async fn dispatch_signed(
// verification actor lives.
(GET, ["o", ulid]) => ap::serve_object(&cx, ulid),
// POSTs to an actor's inbox need to be signed to prevent impersonation.
(POST, ["o", ulid, "inbox"]) => {
with_json(&req.body, |json| ap::inbox(&cx, ulid, sig, json)).await
}
(POST, ["o", ulid, "inbox"]) => ap::inbox(&cx, ulid, sig, &req.body).await,
// Try the resources for which no signature is required as well.
_ => dispatch_public(cx, verifier, req).await,
}
@ -230,6 +232,7 @@ async fn dispatch_signed(
/// Dispatch `req` to an unprotected endpoint. If the requested path does not exist, the
/// function will return a 404 response. If the path *does* exist, but the signature is not
/// valid, they will also get a 404.
#[tracing::instrument(skip_all)]
async fn dispatch_public(
cx: Context,
verifier: &Verifier,
@ -245,16 +248,6 @@ async fn dispatch_public(
}
}
async fn with_json<F>(body: &[u8], f: impl FnOnce(Value) -> F) -> Result<Response, Message>
where
F: Future<Output = Result<Response, Message>>,
{
match from_slice(body) {
Ok(json) => f(json).await,
Err(e) => fuck!(400: "could not decode json: {e}"),
}
}
mod error {
//! Pre-baked error responses.
@ -262,6 +255,7 @@ mod error {
use super::Response;
/// An error message shown to an end user of the API.
#[derive(Debug)]
pub struct Message {
/// The main error message.
pub error: String,

View file

@ -55,7 +55,7 @@ pub async fn outbox(cx: &Context, params: &[(&str, &str)]) -> Result<Response, M
fuck!(500: "failed actor by name {user}");
};
let post = puppy::post::create_post(&cx, actor.key, content.to_string()).unwrap();
let post = puppy::post::create_local_post(&cx, actor.key, content.to_string()).unwrap();
puppy::post::federate_post(&cx, post).await.unwrap();
Ok(respond! {
code: 200
@ -67,10 +67,11 @@ pub async fn inbox(
cx: &Context,
actor_id: &str,
sig: Signer,
body: Value,
body: &[u8],
) -> Result<Response, Message> {
let receiver = actor_id.parse::<Key>().unwrap();
match Activity::from_json(body) {
let json = serde_json::from_slice(body).unwrap();
match Activity::from_json(json) {
Ok(activity) => {
puppy::ingest(&cx, receiver, &activity).await.unwrap();
match puppy::interpret(&cx, activity) {

View file

@ -9,6 +9,11 @@
#![feature(try_blocks, yeet_expr)]
use puppy::{config::Config, Context};
use tracing::Level;
use tracing_forest::ForestLayer;
use tracing_subscriber::{
filter::filter_fn, layer::SubscriberExt as _, util::SubscriberInitExt as _, Registry,
};
mod sig;
mod api;
@ -16,6 +21,11 @@ mod api;
/// Starts up the whole shebang.
#[tokio::main]
async fn main() {
Registry::default()
.with(filter_fn(|meta| !meta.target().starts_with("reqwest")))
.with(filter_fn(|meta| *meta.level() < Level::DEBUG))
.with(ForestLayer::default())
.init();
// TODO: load the config from a file or something.
let config = Config {
ap_domain: "test.piss-on.me".to_string(),

View file

@ -8,6 +8,7 @@ use puppy::fetch::{
use serde_json::{json, Value};
use puppy::config::Config;
use tracing::{debug, error, info, trace};
/// Checks request signatures.
#[derive(Clone)]
@ -23,6 +24,7 @@ const VERIFIER_PATH: &str = "/s/request-verifier";
pub const VERIFIER_MOUNT: &[&str] = &["s", "request-verifier"];
/// A "verdict" about a signed request, passed by a [`Verifier`].
#[derive(Debug)]
pub enum Verdict {
/// The signature checks out.
Verified(Signer),
@ -80,15 +82,18 @@ impl Verifier {
}
/// Does the HTTP signature verification process, and returns a "proof" of the signature in the form
/// of the [`Signer`], which contains information about who signed a particular request.
#[tracing::instrument(skip_all)]
pub async fn verify<B>(&self, req: &Request<B>) -> Verdict {
// TODO: implement the whole verification thing as a middleware so we can intercept requests
// like these, instead of coupling this tightly with the router.
if req.uri().path() == VERIFIER_PATH {
// HACK: Allow access to the request verifier actor without checking the signature.
debug!("allowing request to verifier to pass without checking signature");
return Verdict::Unsigned;
}
let Some(header) = req.headers().get("signature") else {
info!("request not signed");
return Verdict::Unsigned;
};
@ -98,8 +103,18 @@ impl Verifier {
.to_string();
let sig = match Signature::derive(&req) {
Err(error) => return Verdict::Rejected { signature_str, reason: error },
Ok(signature) => signature,
Err(error) => {
info! {
reason = error.to_string(),
signature = signature_str,
"invalid signature",
};
return Verdict::Rejected { signature_str, reason: error };
}
Ok(signature) => {
trace!("signature parsed");
signature
}
};
// Fetch the signer's public key using our private key.
@ -107,22 +122,29 @@ impl Verifier {
let public_key = match fetch_result {
Ok(public_key) => public_key,
Err(err) => {
info! {
reason = err.to_string(),
"failed to fetch pubkey",
};
return Verdict::Rejected {
reason: format!("could not fetch public key: {err}"),
signature_str,
}
};
}
};
// TODO: verify digest also
if let Err(error) = public_key.verify(&sig) {
info!(reason = error, "rejected signature");
Verdict::Rejected { signature_str, reason: error }
} else {
info!(key_owner = public_key.owner, "signature OK");
Verdict::Verified(Signer { ap_id: public_key.owner })
}
}
/// Send a request to get the public key from an ID. This request will be signed with the
/// verifier actor's public key.
#[tracing::instrument(skip_all)]
async fn fetch_public_key(&self, uri: &str) -> Result<VerificationKey, FetchError> {
let json = puppy::fetch::resolve(&self.signing_key(), uri).await?;
let Some(key) = Key::from_json(json) else {
@ -143,6 +165,7 @@ impl Verifier {
}
/// An ActivityPub actor that signed a request.
#[derive(Debug)]
pub struct Signer {
/// The ActivityPub ID (a URL) of the signer of the request.
pub ap_id: String,

View file

@ -7,7 +7,6 @@ path = "src/lib.rs"
[dependencies]
reqwest = { version = "*", features = ["json"] }
sigh = "*"
serde_json = "*"
derive_more = "*"
http = "*"
@ -18,3 +17,4 @@ spki = "*"
http-body-util = "*"
rand = "*"
pem = "*"
tracing = "*"

View file

@ -4,6 +4,7 @@ use http_body_util::BodyExt as _;
use reqwest::Body;
use serde_json::Value;
use derive_more::Display;
use tracing::{debug, info, instrument};
use crate::{
object::Activity,
@ -37,6 +38,7 @@ impl Client {
///
/// Note that in order for the request to be considered valid by most implementations, `key.owner`
/// must equal `payload.actor`.
#[instrument(skip_all, fields(activity = payload.id, url = inbox, key = key.id))]
pub async fn deliver(&self, key: &SigningKey, payload: &Activity, inbox: &str) {
let system = Subsystem::Delivery;
@ -58,6 +60,7 @@ impl Client {
self.inner.execute(request).await.unwrap();
}
/// A high-level function to resolve a single ActivityPub ID using a signed request.
#[instrument(skip_all, fields(url = url, key = key.id))]
pub async fn resolve(&self, key: &SigningKey, url: &str) -> Result<Value, FetchError> {
let system = Subsystem::Resolver;
@ -77,16 +80,20 @@ impl Client {
if response.status().is_success() {
response.json().await.map_err(From::from)
} else {
let status = response.status().as_u16();
let body = response.text().await?;
info!(status, "resolution failed: {body}");
Err(FetchError::NotSuccess {
status: response.status().as_u16(),
body: response.text().await?,
url: url.to_string(),
status,
body,
})
}
}
/// Forwards a request and returns the raw response, so that it can be analyzed for debugging.
///
/// It exists solely as a debugging tool!
#[instrument(skip_all, fields(url, key = key.id))]
pub async fn proxy(
&self,
key: &SigningKey,

View file

@ -5,6 +5,7 @@ use derive_more::From;
pub use crate::signatures::Key as PublicKey;
#[derive(Debug)]
pub struct Activity<T = String> {
pub id: String,
pub actor: String,
@ -58,6 +59,7 @@ impl Activity {
}
/// An actor is an entity capable of producing Takes.
#[derive(Debug)]
pub struct Actor {
/// The URL pointing to this object.
pub id: String,
@ -125,7 +127,7 @@ impl Actor {
}
}
#[derive(From)]
#[derive(From, Debug)]
pub enum Object {
#[from(ignore)]
Id {
@ -133,14 +135,7 @@ pub enum Object {
},
Activity(Activity),
Actor(Actor),
#[from(ignore)]
Other {
id: String,
kind: String,
author: String,
content: Option<String>,
summary: Option<String>,
},
Note(Note),
}
impl Object {
@ -148,7 +143,7 @@ impl Object {
match self {
Object::Activity(a) => &a.id,
Object::Actor(a) => &a.id,
Object::Other { id, .. } => id,
Object::Note(n) => &n.id,
Object::Id { id } => id,
}
}
@ -163,7 +158,7 @@ impl Object {
Some("Create" | "Follow" | "Accept" | "Reject" | "Bite") => {
Activity::from_json(json).map(Object::Activity)
}
Some(kind) => Ok(Object::Other {
Some(kind) => Ok(Object::Note(Note {
id: map
.get("id")
.ok_or("id is required")?
@ -185,7 +180,7 @@ impl Object {
.get("summary")
.and_then(|v| v.as_str())
.map(str::to_owned),
}),
})),
None => do yeet "could not determine type of object",
}
} else {
@ -197,13 +192,13 @@ impl Object {
Object::Id { id } => json!(id),
Object::Activity(a) => a.to_json_ld(),
Object::Actor(a) => a.to_json_ld(),
Object::Other {
Object::Note(Note {
id,
kind,
content,
summary,
author,
} => json!({
}) => json!({
"to": [
"https://www.w3.org/ns/activitystreams#Public",
],
@ -216,3 +211,12 @@ impl Object {
}
}
}
#[derive(Debug)]
pub struct Note {
pub id: String,
pub author: String,
pub content: Option<String>,
pub summary: Option<String>,
pub kind: String,
}

View file

@ -56,6 +56,7 @@ pub type SigningKey = Key<Private>;
/// - `Key` (`K` = [`String`]): PEM-encoded, can be turned into a JSON object.
/// - [`VerificationKey`] (`K` = [`Public`]): used as an input in the request signature validation process.
/// - [`SigningKey`] (`K` = [`Private`]): used as an input in the generation of a signed request.
#[derive(Debug)]
pub struct Key<K = String> {
/// The `"id"` property of the public key, which should equal the `keyId` part of a signature.
pub id: String,
@ -166,6 +167,12 @@ impl Public {
}
}
impl std::fmt::Debug for Public {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.encode_pem().fmt(f)
}
}
impl SigningKey {
/// Create a signature for `req` using the given options.
pub fn sign<T>(&self, opt: Options, req: &Request<T>) -> Result<Signature, String> {

View file

@ -13,3 +13,4 @@ chrono = "*"
either = "*"
derive_more = "*"
serde_json = "*"
tracing = "*"

View file

@ -21,7 +21,7 @@ pub use context::test_context;
use data::{ActivityKind, AuthorOf, Channel, Content, Create, Id, ObjectKind, Profile, PublicKey};
use fetch::object::{Activity, Object};
use fetch::object::{Activity, Note, Object};
use store::Transaction;
pub use store::{self, Key, StoreError};
pub use fetch::{self, FetchError};
@ -88,13 +88,13 @@ pub fn get_local_ap_object(tx: &Transaction<'_>, key: Key) -> Result<fetch::obje
let Some(Id(author)) = tx.get_alias(author)? else {
todo!()
};
Ok(fetch::object::Object::Other {
Ok(fetch::object::Object::Note(Note {
id: obj.id.0.clone().into(),
summary: warning,
content,
author,
kind,
})
}))
}
_ => todo!(),
}
@ -252,6 +252,7 @@ pub mod config {
/// Interpret an *incoming* activity. Outgoing activities are *never* interpreted through this function,
/// because their changes are already in the database.
// TODO: figure out if that is the behavior we actually want
#[tracing::instrument(skip(cx))]
pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
// Fetch our actor from the database
let Some(actor) = cx.store().lookup(Id(activity.actor.clone()))? else {
@ -278,7 +279,9 @@ pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
(actor.do_bite(&cx, object)?.id, ActivityKind::Bite)
}
"Create" => {
todo!()
// NOTE: due to the ingesting, we already have this information.
// TODO: change this. for god's sake
return Ok(());
}
"Follow" => {
let object = actor::Actor { key: object };
@ -324,6 +327,7 @@ pub fn interpret(cx: &Context, activity: Activity) -> Result<()> {
}
/// Make sure all the interesting bits of an activity are here.
#[tracing::instrument(skip(cx))]
pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()> {
let key = cx.run(|tx| get_signing_key(tx, actor::Actor { key: auth }).map_err(Error::Store))?;
for id in [activity.actor.as_str(), activity.object.id()] {
@ -336,6 +340,7 @@ pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()>
match object {
Object::Activity(a) => interpret(&cx, a)?,
Object::Actor(a) => actor::create_remote(cx, a).map(void)?,
Object::Note(a) => post::create_post_from_note(cx, a).map(void)?,
_ => todo!(),
}
}
@ -344,3 +349,7 @@ pub async fn ingest(cx: &Context, auth: Key, activity: &Activity) -> Result<()>
/// Discard the argument.
fn void<T>(_: T) -> () {}
pub mod remote {
//! Bridging the gap between other servers and us.
}

View file

@ -4,19 +4,15 @@ use std::ops::RangeBounds;
use chrono::{DateTime, Utc};
use either::Either::{Left, Right};
use fetch::{
object::{Activity, Object},
signatures::Private,
};
use fetch::object::{Activity, Note, Object};
use store::{util::IterExt as _, Key, Store, StoreError, Transaction};
use crate::{
actor::{get_signing_key, Actor},
data::{
self, ActivityKind, AuthorOf, Channel, Content, Create, Follows, Id, ObjectKind,
PrivateKey, Profile, PublicKey,
self, ActivityKind, AuthorOf, Channel, Content, Create, Follows, Id, ObjectKind, Profile,
},
Context, Error,
Context,
};
#[derive(Clone, Copy, Debug)]
@ -133,7 +129,11 @@ pub fn fetch_timeline(
}
/// Create a new post entity.
pub fn create_post(cx: &Context, author: Key, content: impl Into<Content>) -> crate::Result<Post> {
pub fn create_local_post(
cx: &Context,
author: Key,
content: impl Into<Content>,
) -> crate::Result<Post> {
let content = content.into();
cx.run(|tx| {
let key = Key::gen();
@ -151,6 +151,33 @@ pub fn create_post(cx: &Context, author: Key, content: impl Into<Content>) -> cr
})
}
/// Assumes all objects referenced already exist.
#[tracing::instrument(skip(cx))]
pub fn create_post_from_note(cx: &Context, note: Note) -> crate::Result<Post> {
cx.run(|tx| {
let Some(author) = tx.lookup(Id(note.author))? else {
panic!("needed author to already exist")
};
let key = Key::gen();
tx.add_alias(key, Id(note.id.clone()))?;
tx.create(AuthorOf { object: key, author })?;
tx.add_mixin(key, Content {
content: note.content,
warning: note.summary,
})?;
tx.add_mixin(key, data::Object {
kind: ObjectKind::Notelike(note.kind),
id: Id(note.id),
local: false,
})?;
Ok(Post { key })
})
}
#[tracing::instrument(skip(cx))]
pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> {
// Obtain all the data we need to construct our activity
let (Content { content, warning }, url, author, signing_key, followers) = cx.run(|tx| try {
@ -185,13 +212,13 @@ pub async fn federate_post(cx: &Context, post: Post) -> crate::Result<()> {
let activity = Activity {
id: cx.mk_url(activity_key),
actor: signing_key.owner.clone(),
object: Box::new(Object::Other {
object: Box::new(Object::Note(Note {
id: url.to_string(),
kind: "Note".to_string(),
author: cx.mk_url(author),
summary: warning,
content,
}),
})),
kind: "Create".to_string(),
};