From c6bdbfbc52eacd800a3ad68db18f65d28f41bee6 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Sat, 18 Jun 2022 20:55:31 +0200 Subject: [PATCH] Move a lot of code to different modules uwu --- Cargo.lock | 1 + Cargo.toml | 1 + src/ap/mod.rs | 97 +++++++ src/conf.rs | 83 ++++++ src/db/mod.rs | 126 ++++++++ src/lib.rs | 243 ++++++++++++++++ src/main.rs | 786 ++++---------------------------------------------- src/sign.rs | 51 ++++ 8 files changed, 663 insertions(+), 725 deletions(-) create mode 100644 src/ap/mod.rs create mode 100644 src/conf.rs create mode 100644 src/db/mod.rs create mode 100644 src/lib.rs create mode 100644 src/sign.rs diff --git a/Cargo.lock b/Cargo.lock index fc15a1a..78b1d3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,7 @@ version = "0.1.0" dependencies = [ "axum", "futures", + "openssl", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 2e340a0..24cd36b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ serde_json = '*' axum = { version = '*', features = [ "ws", "serde_json" ] } url = { version = '*', features = [ "serde" ] } sqlx = { version = '*', features = [ "postgres", "runtime-tokio-native-tls" ] } +openssl = '*' diff --git a/src/ap/mod.rs b/src/ap/mod.rs new file mode 100644 index 0000000..2027a28 --- /dev/null +++ b/src/ap/mod.rs @@ -0,0 +1,97 @@ +//! ActivityPub implementation code and related abstractions. + +use futures::prelude::*; + +use crate::{ Id, Activity, err, Result, Error, sign, ctx::Context }; + +#[derive(Clone)] +pub enum Create { + Note { object: Note } +} + +impl From for Activity { fn from (a: Create) -> Activity { Activity::Create (a) } } + +#[derive(Clone)] +pub enum Follow { + Actor { object: Actor } +} + +impl From for Activity { fn from (a: Follow) -> Activity { Activity::Follow (a) } } + +#[derive(Clone)] +pub enum Accept { + Follow { object: Follow } +} + +impl From for Activity { fn from (a: Accept) -> Activity { Activity::Accept (a) } } + +/// An entity that publishes activities. +#[derive(Clone)] +pub struct Actor { + id: Id, +} + +#[derive(Clone)] +pub struct Note { + id: Id, +} + +impl Activity { + + /// Deliver the activity to all its targets through the ActivityPub + /// delivery mechanism. + pub async fn deliver (self, signer: &S) -> Result<()> + where + S: sign::Sign + ?Sized, + { + // Create a shared client #efficiency + let client = reqwest::Client::new(); + + // the function that does the delivery to a target. It creates + // a request with the proper headers and signs it using the + // `signer`. + let do_delivery = |url| async { + let req = { + let mut r = client.get(url).build()?; + signer.sign(&mut r)?; + r + }; + client + .execute(req) + .map_err(err) + .await + }; + + // Collect only the errors, since we don't need to do anything + // with a successful delivery. + let errors = self + .delivery_targets() + .await? + .into_iter() + .map(do_delivery) + .collect::>() + .filter_map(|r: Result<_>| async { + r.err().map(err) + }) + .collect::>() + .await; + + for err in errors { + // Failure to deliver is not a fatal error per se, + // so we log and move on. + println!("Failed to deliver activity: {:?}", err); + } + + Ok (()) + } + + // Get all delivery targets as urls. + async fn delivery_targets (&self) -> Result> { + todo!() + } + + pub async fn perform (self, ctx: &mut Context) -> Result<()> where S: sign::Sign { + todo!() + } + +} diff --git a/src/conf.rs b/src/conf.rs new file mode 100644 index 0000000..4fca90c --- /dev/null +++ b/src/conf.rs @@ -0,0 +1,83 @@ +//! Hermit instance configuration. + +use crate::Id; + +use std::collections::HashMap as Map; + +/// The main configuration for any Hermit instance. +#[derive(Clone)] +pub struct Config { + /// The domain of the instance. + pub host: String, + /// The port to host the instance on. Defaults to `6969`. + pub port: u16, + /// Filtering rules applied to each activity. + pub rules: Vec, + /// Notification configuration for each local actor. + pub notify: Map, +} + +impl Config { + /// Create a new default config. + pub fn new (hostname: impl ToString) -> Config { + let (notify, rules) = def(); + Config { + host: hostname.to_string(), + port: 6969, + notify, + rules, + } + } +} + +/// Controls when notifications should be sent. +#[derive(Clone, Copy)] +pub struct Notify { + /// Whether to send a notification when a post is liked. + pub post_liked: bool, + /// Whether to send a notification when a post is shared. + pub post_shared: bool, + /// Whether to send a notification when a follow request is received. + pub follow_requested: bool, + /// Whether to send a notification when a follow request is accepted. + pub new_follower: bool, +} + +impl Default for Notify { + fn default () -> Self { + Notify { + post_liked: true, + post_shared: true, + follow_requested: true, + new_follower: true, + } + } +} + +/// Shortcut for creating a default instance +fn def () -> T where T: Default { T::default() } + +pub mod rule { + + //! Filtering rules for [`Activity`] data. + + use crate::Activity; + + /// A filtering rule. + #[derive(Clone)] + pub struct Rule (Inner); + + impl Rule { + /// Apply the rule to the activity. + pub fn apply (&self, a: Activity) -> Option { + match self.0 {} + } + } + + /// When adding new filtering rules, add a variant to this + /// enum and write your implementation in the `apply` function + /// of [`Rule`]. + #[derive(Clone)] + enum Inner {} + +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..cd19635 --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,126 @@ +//! Database abstraction layer used by Hermit. + +use crate::{ Id, Result }; +use futures::prelude::*; +use sqlx::{Executor, pool::PoolConnection}; + +/// `const ()` but in Rust +fn void (_: T) -> () { () } + +type Database = sqlx::Postgres; + +/// Specifies how to connect to the database. +pub struct Config {} + +/// A database client. +/// +/// Cloning this client is cheap. +#[derive(Clone)] +pub struct Client { + /// The internal connection pool. + pool: sqlx::Pool, +} + +impl Client { + + /// Attempt to connect to the database using the provided configuration. + pub async fn new (_: Config) -> Result { + todo!() + } + + /// Fetch the data mapped to the given `key` from the database. + pub async fn get (&self, key: T::Key) -> Result> + where + T: Get, + { + self.with_conn(|c| T::get(key, c)) + .await + } + + /// Perfom an insertion on the database. + pub async fn insert (&mut self, data: T) -> Result<()> + where + T: Insert, + { + self.with_conn(|c| data.set(c)) + .await + .map(void) + } + + /// Delete something from the database. + pub async fn delete (&mut self, key: T::Key) -> Result<()> + where + T: Delete, + { + self.with_conn(|c| T::del(key, c)) + .await + } + + /// Handles the getting-a-connection logic. + async fn with_conn (&self, f: F) -> Result + where + F: FnOnce (&mut PoolConnection) -> O, + O: Future>, + { + use crate::err; + + self.pool + .acquire() + .map_err(err) + .and_then(|mut c| { + f(&mut c) + }) + .await + } + +} + +pub trait Object: Sized { + type Key: Eq; + fn key (&self) -> &Self::Key; +} + +pub trait Insert: Object { + type Future: Future>; + fn set <'e, E> (self, exec: E) -> Self::Future + where + E: Executor<'e>; +} + +pub trait Delete: Object { + type Future: Future>; + fn del <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>; +} + +pub trait Get: Object { + type Future: Future>>; + fn get <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>; +} + +pub mod ops { + + //! Database operations (queries and updates). + + use super::*; + + pub struct Following { + pub from: Id, + pub to: Id, + pub id: Id, + } + + impl Object for Following { + type Key = Id; + fn key (&self) -> &Self::Key { &self.id } + } + + impl Insert for Following { + type Future = future::BoxFuture<'static, Result>; + fn set <'e, E> (self, exec: E) -> Self::Future + where + E: Executor<'e> + { + todo!() + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a275892 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,243 @@ +//! # The Hermit ActivityPub server +//! +//! This library contains the types and trait impls that make up the ActivityPub +//! support and database interaction for the Hermit ActivityPub server. + +// Expose the `Id` type in the crate root +pub use id::Id; +pub use ctx::Context; + +// Module imports +pub mod conf; +pub mod sign; +pub mod db; +pub mod ap; + +/// The Activity supertype used in abstractions over any kind of activity. +#[derive(Clone)] +pub enum Activity { + /// Create a post. + Create (ap::Create), + /// Request to follow an actor. + Follow (ap::Follow), + /// Accept a follow request. + Accept (ap::Accept), +} + +/// A result type that defaults to using [`Error`] as the second type +/// parameter. +pub type Result = std::result::Result; + +/// Errors generated within Hermit. +#[derive(Debug)] +pub enum Error { + /// [`reqwest`] errors. + Http (reqwest::Error), + /// [`serde_json`] errors. + Json (serde_json::Error), + /// [`sqlx`] errors. + Sqlx (sqlx::Error), + /// A cryptography error from [`openssl`]. + OpenSSL (openssl::error::ErrorStack), +} + +impl From for Error { + fn from (e: sqlx::Error) -> Self { Error::Sqlx (e) } +} + +impl From for Error { + fn from (e: reqwest::Error) -> Self { Error::Http (e) } +} + +impl From for Error { + fn from (e: serde_json::Error) -> Self { Error::Json (e) } +} + +impl From for Error { + fn from (e: openssl::error::ErrorStack) -> Self { Error::OpenSSL (e) } +} + +/// Trivial conversion function for use in `map_err` functions. +pub (crate) fn err (e: impl Into) -> Error { e.into() } + +mod id { + + use std::str::FromStr; + + use serde::{ Deserialize, Serialize }; + + /// An ActivityPub identifier. + #[derive(PartialEq, Eq, Clone, Serialize, Deserialize)] + pub struct Id (reqwest::Url); + + impl crate::IntoUrl for Id { + fn into_url (self) -> Option { Some (self.0) } + } + + impl FromStr for Id { + type Err = url::ParseError; + + fn from_str (s: &str) -> Result { + s.parse().map(Id) + } + } + +} + +mod ctx { + + use std::sync::Arc; + + use futures::prelude::*; + use serde_json::Value; + + use crate::{ conf::Config, db, Result, sign::Sign, ap, Activity }; + + pub struct Context { + pub config: Config, + pub signer: Arc, + pub client: db::Client, + } + + impl Clone for Context { + fn clone (&self) -> Context { + Context { + config: self.config.clone(), + signer: self.signer.clone(), + client: self.client.clone(), + } + } + } + + impl Context { + + /// Attempt an action within the context of the database. + pub async fn with_db <'a, F, O, T> (&'a mut self, f: F) -> Result + where + F: FnOnce (&'a mut db::Client) -> O, + O: Future> + 'a, + { + f(&mut self.client).await + } + + /// Get all actors on the instance. + pub fn actors (&self) -> impl Iterator + '_ { + None.into_iter() + } + + /// Get a dereferencer. + pub fn dereferencer (&self) -> Dereferencer + where + S: Sign + { + Dereferencer { + web: reqwest::Client::new(), + signer: self.signer.clone(), + db: self.client.clone(), + } + } + + /// Access the inner [`Sign`] provider. + pub fn signer (&self) -> &S { + &self.signer + } + + pub fn config (&self) -> &Config { + &self.config + } + + pub fn config_mut (&mut self) -> &mut Config { + &mut self.config + } + + /// Conjure an activity "from thin air" as though it were posted through a client. + pub (crate) async fn conjure (&self, act: impl Into) -> Result<()> { + let act = act.into(); + todo!() + } + + } + + /// A type that provides dereferencing facilities for [`Activity`] data. + pub struct Dereferencer { + web: reqwest::Client, + db: db::Client, + signer: Arc, + } + + impl Dereferencer + where + S: Sign + { + /// Perform the dereferencing. + pub async fn dereference (&self, json: Value) -> Result { + match json["type"].as_str() { + Some ("Create") => self.deref_create(json).await.map(Activity::Create), + _ => todo!() + } + } + + fn db_client (&self) -> &db::Client { + &self.db + } + + fn web_client (&self) -> &reqwest::Client { + &self.web + } + + /// Fetch a JSON value. + pub async fn fetch (&self, url: impl crate::IntoUrl) -> Result { + + let client = self.web_client(); + + let url = match url.into_url() { + Some (url) => url, + None => todo!(), + }; + + let req = { + let mut r = client.get(url).build()?; + self.signer.sign(&mut r)?; + r + }; + + let value = client + .execute(req) + .await? + .json() + .await?; + + Ok (value) + + } + + /// Attempt to dereference to a [`Create`](ap::Create) activity. + async fn deref_create (&self, json: Value) -> Result { + let json = if let Value::String (url) = json { + self.fetch(url).await? + } else { json }; + + match json["object"]["type"].as_str() { + Some ("Note" | "Article") => todo!(), //Ok (act::Create::Note { id }), + _ => return Err (todo!()), + } + } + } + + +} + +/// Types that can be mapped to a [`Url`](url::Url). +pub trait IntoUrl { + /// Perform the conversion. + fn into_url (self) -> Option; +} + +impl IntoUrl for T where T: ToString { + fn into_url (self) -> Option { + self.to_string() + .parse() + .ok() + } +} + diff --git a/src/main.rs b/src/main.rs index a6fff94..584ecd7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,46 @@ -pub use id::Id; +use std::sync::Arc; -use serde_json::{from_value, Value}; -use futures::prelude::*; -use sign::Sign; -use conf::Config; +use hermit::{ Context, Error, db, sign, Activity, }; + +use hermit::conf::Config; +use tokio::sync::RwLock; #[tokio::main] async fn main () { - let cfg = Config::new("hmt.riley.lgbt"); - let ctx = Context { - config: cfg, - signer: todo!(), + // Set up the context for each task. + let ctx = { + + // The hostname this server will be hosted under (hardcoded for now) + let hostname = "dev.riley.lgbt"; + + // Establish a connection to the database. + let client = db::Client::new(db::Config {}).await.unwrap(); + + // Generate the config from the hostname. + let config = Config::new(&hostname); + + // Use an instance-wide signing key (for now). + let signer = sign::Key::load( + format!("https://{hostname}/key/main").parse().unwrap(), + "private_key.pem" + ).map(Arc::new).unwrap(); + + Context { + signer, + config, + client, + } + }; + + // Initialize the web server. + task::run(&ctx, task::Server {}); } +fn err (e: impl Into) -> Error { e.into() } + mod task { //! Async tasks, communicating with each other across threads through generic @@ -24,14 +49,15 @@ mod task { use std::pin::Pin; use futures::prelude::*; use serde_json::Value; - use crate::{sign::Sign, flow::Flow, Activity, ctrl::Message, Context}; + use crate::sign::Sign; + use crate::{flow::Flow, Activity, ctrl::Message, Context}; /// Perform a [`Task`]. pub fn run (ctx: &Context, task: impl Task) where - S: Sign + Clone + Send + Sync + 'static + S: Sign + Send + Sync + 'static { - let ctx = ctx.clone(); + let ctx: Context = ctx.clone(); tokio::spawn(task.run(ctx)); } @@ -44,8 +70,22 @@ mod task { /// Execute the task. fn run (self, ctx: Context) -> Self::Future where - S: Sign + Clone + Send + Sync + 'static; + S: Sign + Send + Sync + 'static; + } + + /// The main web server. + pub struct Server {} + + impl Task for Server { + type Future = Pin + Send + 'static>>; + + fn run (self, ctx: Context) -> Self::Future + where + S: Sign + Send + Sync + 'static + { + todo!() + } } /// API request event processing. @@ -102,23 +142,21 @@ mod task { { type Future = Pin + Send + 'static>>; - fn run (self, ctx: Context) -> Self::Future + fn run (self, mut ctx: Context) -> Self::Future where - S: Sign + Clone + Send + Sync + 'static + S: Sign + Send + Sync + 'static { let Self { mut data_rx, mut ctrl_rx } = self; Box::pin(async move { - - let mut config = crate::conf::Config::new("localhost"); - + loop { tokio::select! { // Await control commands from `Ctrl`. Some (message) = ctrl_rx.next() => match message { // Live config reloading. - Message::Reconfigure (c) => c(&mut config), + Message::Reconfigure (c) => c(&mut ctx.config), // Graceful termination command from `Ctrl`. Message::Terminate => break, }, @@ -138,7 +176,7 @@ mod task { }; // Run both incoming and outgoing activities through the filtering system. - let action = |act| config.rules.iter().try_fold(act, |a, r| r.apply(a)); + let action = |act| ctx.config.rules.iter().try_fold(act, |a, r| r.apply(a)); let data = match data.map(action).to_option() { // Activity survived the filtering process, bind it to `data`. Some (data) => data, @@ -147,8 +185,7 @@ mod task { }; // Perform each activity in the context of the instance. - let c = ctx.clone(); - if let Err (err) = data.clone().apply(|a| a.perform(c)).await { + if let Err (err) = data.clone().apply(|a| a.perform(&mut ctx)).await { // Something went wrong while performing the activity, // report error and move on. println!("Exec | Failure '{:?}'", err); @@ -158,8 +195,7 @@ mod task { // Push each activity to an appropriate location. // If incoming: push a notification to the frontend. let incoming = { - let n = ctx.notifier(); - move |a: Activity| a.notify(n) + move |_: Activity| async { todo!() } }; // If outgoing: deliver the activity to its targets using // the ActivityPub delivery mechanism. @@ -294,7 +330,7 @@ pub mod ctrl { use std::sync::Arc; - use crate::conf::Config; + use hermit::conf::Config; #[derive(Clone)] pub enum Message { @@ -306,703 +342,3 @@ pub mod ctrl { } -/// Configuration. -pub mod conf { - - use std::sync::Arc; - use crate::rule::Rule; - - #[derive(Clone)] - pub struct Config { - /// The domain of the instance. - pub host: String, - /// The port to host the instance on. Defaults to `6969`. - pub port: u16, - /// Filtering rules applied to each activity. - pub rules: Vec>>, - /// Notification configuration. - pub notify: Notify, - } - - impl Config { - /// Create a new default config. - pub fn new (hostname: impl ToString) -> Config { - let (notify, rules) = def(); - Config { - host: hostname.to_string(), - port: 6969, - notify, - rules, - } - } - } - - #[derive(Clone, Copy)] - pub struct Notify { - pub post_liked: bool, - pub post_shared: bool, - pub follow_requested: bool, - pub new_follower: bool, - } - - impl Default for Notify { - fn default () -> Self { - Notify { - post_liked: true, - post_shared: true, - follow_requested: true, - new_follower: true, - } - } - } - - /// Shortcut for creating a default instance - fn def () -> T where T: Default { T::default() } - -} - -#[derive(Clone)] -pub struct Context { - config: Config, - signer: S, - client: db::Client, -} - -impl Context { - - /// Attempt an action within the context of the database. - pub async fn with_db <'a, F, O, T> (&'a mut self, f: F) -> Result - where - F: FnOnce (&'a mut db::Client) -> O, - O: Future> + 'a, - { - f(&mut self.client).await - } - - /// Get all actors on the instance. - pub fn actors (&self) -> impl Iterator + '_ { - None.into_iter() - } - - /// Get a dereferencer. - pub fn dereferencer (&self) -> Dereferencer - where - S: Sign + Clone - { - Dereferencer { - web: reqwest::Client::new(), - signer: self.signer.clone(), - db: self.client.clone(), - } - } - - /// Access the inner [`Sign`] provider. - pub fn signer (&self) -> &S { - &self.signer - } - - /// Access a notifier that delivers notifications to their intended targets. - pub fn notifier (&self) -> Notifier { - todo!() - } - - /// Conjure an activity "from thin air" as though it were posted through a client. - pub (crate) async fn conjure (&self, act: impl Into) -> Result<()> { - let act = act.into(); - todo!() - } - -} - -pub trait IntoUrl { - fn into_url (self) -> Option; -} - -impl IntoUrl for T where T: ToString { - fn into_url (self) -> Option { - self.to_string() - .parse() - .ok() - } -} - -pub struct Notifier { - config: conf::Notify, - socket: Box + Send + Sync + Unpin>, -} - -/// A type that provides dereferencing facilities for [`Activity`] data. -pub struct Dereferencer { - web: reqwest::Client, - db: db::Client, - signer: S, -} - -impl Dereferencer -where - S: Sign -{ - /// Perform the dereferencing. - pub async fn dereference (&self, json: Value) -> Result { - match json["type"].as_str() { - Some ("Create") => self.deref_create(json).await.map(Activity::from), - _ => todo!() - } - } - - fn db_client (&self) -> &db::Client { - &self.db - } - - fn web_client (&self) -> &reqwest::Client { - &self.web - } - - /// Fetch a JSON value. - pub async fn fetch (&self, url: impl IntoUrl) -> Result { - - let client = self.web_client(); - - let url = match url.into_url() { - Some (url) => url, - None => todo!(), - }; - - let req = { - let mut r = client.get(url).build()?; - self.signer.sign(&mut r)?; - r - }; - - let value = client - .execute(req) - .await? - .json() - .await?; - - Ok (value) - - } - - /// Attempt to dereference to a [`Create`](ap::Create) activity. - async fn deref_create (&self, json: Value) -> Result { - let json = if let Value::String (url) = json { - self.fetch(url).await? - } else { json }; - - match json["object"]["type"].as_str() { - Some ("Note" | "Article") => todo!(), //Ok (act::Create::Note { id }), - _ => return Err (todo!()), - } - } -} - -#[derive(Debug)] -pub enum Error { - Http (reqwest::Error), - Json (serde_json::Error), - Sqlx (sqlx::Error), -} - -impl From for Error { - fn from (e: sqlx::Error) -> Self { Error::Sqlx (e) } -} - -impl From for Error { - fn from (e: reqwest::Error) -> Self { Error::Http (e) } -} - -impl From for Error { - fn from (e: serde_json::Error) -> Self { Error::Json (e) } -} - -fn err (e: impl Into) -> Error { e.into() } - -pub type Result = std::result::Result; - -#[derive(Clone)] -pub struct Actor { - id: Id, - is_locked: bool, -} - -#[derive(Clone)] -pub enum Activity { - Create (ap::Create), - Follow (ap::Follow), - Accept (ap::Accept), -} - -impl Activity { - pub async fn perform (self, mut ctx: Context) -> Result<()> - where - S: sign::Sign - { - use ap::*; - - match self { - Activity::Follow (Follow::Actor { id, actor, object, .. }) => { - - // Find the actor this activity refers to. If it's not a local - // actor, we don't care. - let x = ctx.actors().find(|a| object.id == a.id); - match x { - - // Unlocked account - Some (a) if !a.is_locked => { - - // Prepare the operation. - let op = db::ops::Following { - from: actor.id.clone(), - to: object.id.clone(), - id: id.clone(), - }; - - // Use the database connection to perform an action. - ctx.with_db(|db| db.insert(op)).await?; - - // Reply with an `Accept` activity if the account is not - // locked, so the remote knows it's ok to follow this actor - // immediately. - ctx.conjure(Accept::Follow { - object: Follow::Actor { - id: id.clone(), - object, - actor, - }, - actor: a, - id, - }).await - - }, - - _ => todo!(), - } - }, - _ => todo!(), - } - } - - /// Send a notification to the given [`Sink`]. - pub async fn notify (self, notifier: Notifier) -> Result<()> { - let Notifier { config, mut socket } = notifier; - match &self { - // Only notify if the config value is set to `true`. - Activity::Follow (..) if config.new_follower => - socket.send(self) - .map_err(err) - .await, - // In all other cases, do nothing - _ => Ok (()) - } - } - - /// Deliver the activity to all its targets through the ActivityPub - /// delivery mechanism. - pub async fn deliver (self, signer: &S) -> Result<()> - where - S: sign::Sign + ?Sized, - { - // Create a shared client #efficiency - let client = reqwest::Client::new(); - - // the function that does the delivery to a target. It creates - // a request with the proper headers and signs it using the - // `signer`. - let do_delivery = |url| async { - let req = { - let mut r = client.get(url).build()?; - signer.sign(&mut r)?; - r - }; - client - .execute(req) - .map_err(err) - .await - }; - - // Collect only the errors, since we don't need to do anything - // with a successful delivery. - let errors = self - .delivery_targets() - .await? - .into_iter() - .map(do_delivery) - .collect::>() - .filter_map(|r: Result<_>| async { - r.err().map(err) - }) - .collect::>() - .await; - - for err in errors { - // Failure to deliver is not a fatal error per se, - // so we log and move on. - println!("Failed to deliver activity: {:?}", err); - } - - Ok (()) - } - - // Get all delivery targets as urls. - async fn delivery_targets (&self) -> Result> { - todo!() - } -} - -pub mod db { - use crate::{Id, Result}; - use futures::prelude::*; - use sqlx::{Executor, pool::PoolConnection}; - - /// `const ()` but in Rust - fn void (_: T) -> () { () } - - type Database = sqlx::Postgres; - - pub struct Config {} - - /// A database client. - #[derive(Clone)] - pub struct Client { - /// The internal connection pool. - pool: sqlx::Pool, - } - - impl Client { - - pub async fn new (_: Config) -> Result { - todo!() - } - - /// Fetch the data mapped to the given `key` from the database. - pub async fn get (&self, key: T::Key) -> Result> - where - T: Get, - { - self.with_conn(|c| T::get(key, c)) - .await - } - - /// Perfom an insertion on the database. - pub async fn insert (&mut self, data: T) -> Result<()> - where - T: Insert, - { - self.with_conn(|c| data.set(c)) - .await - .map(void) - } - - /// Delete something from the database. - pub async fn delete (&mut self, key: T::Key) -> Result<()> - where - T: Delete, - { - self.with_conn(|c| T::del(key, c)) - .await - } - - /// Handles the getting-a-connection logic. - async fn with_conn (&self, f: F) -> Result - where - F: FnOnce (&mut PoolConnection) -> O, - O: Future>, - { - use crate::err; - - self.pool - .acquire() - .map_err(err) - .and_then(|mut c| { - f(&mut c) - }) - .await - } - - } - - pub trait Object: Sized { - type Key: Eq; - fn key (&self) -> &Self::Key; - } - - pub trait Insert: Object { - type Future: Future>; - fn set <'e, E> (self, exec: E) -> Self::Future - where - E: Executor<'e>; - } - - pub trait Delete: Object { - type Future: Future>; - fn del <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>; - } - - pub trait Get: Object { - type Future: Future>>; - fn get <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>; - } - - pub mod ops { - - //! Database operations (queries and updates). - - use super::*; - - pub struct Following { - pub from: Id, - pub to: Id, - pub id: Id, - } - - impl Object for Following { - type Key = Id; - fn key (&self) -> &Self::Key { &self.id } - } - - impl Insert for Following { - type Future = future::BoxFuture<'static, Result>; - fn set <'e, E> (self, exec: E) -> Self::Future - where - E: Executor<'e> - { - todo!() - } - } - } -} - -mod id { - use serde::{ Deserialize, Serialize }; - - #[derive(PartialEq, Eq, Clone, Serialize, Deserialize)] - pub struct Id (reqwest::Url); - - impl crate::IntoUrl for Id { - fn into_url (self) -> Option { Some (self.0) } - } - -} - -pub mod ap { - - //! ActivityPub types and utilities. - - use crate::{ Id, Activity, Actor }; - - #[derive(Clone)] - pub enum Create { - Note { - id: Id, - }, - } - - impl From for Activity { - fn from (a: Create) -> Self { Self::Create (a) } - } - - #[derive(Clone)] - pub enum Follow { - Actor { - id: Id, - actor: Actor, - object: Actor, - }, - } - - impl From for Activity { - fn from (a: Follow) -> Self { Self::Follow (a) } - } - - #[derive(Clone)] - pub enum Accept { - Follow { - id: Id, - actor: Actor, - object: Follow, - } - } - - impl From for Activity { - fn from (a: Accept) -> Self { Self::Accept (a) } - } - -} - -pub mod sign { - - //! Request signing. - - use reqwest::Request; - use crate::Result; - - pub trait Sign { - fn sign (&self, req: &mut Request) -> Result<()>; - } -} - -pub mod rule { - - //! User-defined activity transformation rules. - //! - //! Every [`Rule`] is a function `fn (Activity) -> Option`. - - use super::Activity; - - /// Transforms an [`Activity`]. - /// - /// ``` - /// use hermit::{ Activity, rule::{ Filter, Rule, keep } }; - /// - /// // Fails to compile if the given parameter is not a `Rule` - /// fn is_rule (x: R) -> R { x } - /// - /// // Closures of `Activity -> Activity` or - /// // `Activity -> Option` can be used. - /// let closure = is_rule(|a: Activity| Some(a)); - /// - /// // `hermit::rule::Filter` implements `Rule`. This one will - /// // filter every activity. - /// let filter = is_rule(Filter (|_| true)) - /// - /// // `hermit::rule::keep` is a function pointer, and they - /// // always implement the `Fn*` traits. - /// let function = is_rule(keep); - /// - /// // Rules can be combined using the `then` operator, in which - /// // case they will be applied in sequence. - /// let combined = is_rule(closure.then(filter).then(keep)); - /// - /// // Check if it works! Due to `filter`, any input this combined - /// // rule is applied to will be dropped. - /// let result = combined.apply(todo!()); - /// assert!(result.is_none()) - /// ``` - pub trait Rule { - - /// Apply the rule to the [`Activity`]. - /// - /// If this function returns `None`, the activity is dropped and will - /// not be processed further. This allows rules to function both as - /// transformations and as filters. - fn apply (&self, act: Activity) -> Option; - - /// Sequence `next` after `self` in a lazy way. - fn then (self, next: R) -> Then - where - Self: Sized, - R: Rule, - { - Then (self, next) - } - - /// Apply `self` only if `pred` holds. - fn only_if

(self, pred: P) -> Cond - where - Self: Sized, - P: Fn (&Activity) -> bool, - { - Cond { rule: self, pred } - } - - } - - impl Rule for F - where - O: Into>, - F: Fn (Activity) -> O + Clone, - { - fn apply (&self, act: Activity) -> Option { - self(act).into() - } - } - - // Primitives - - /// Always keep passed activities. - pub fn keep (a: Activity) -> Option { Some (a) } - - /// Always drop passed activities. - pub fn drop (_: Activity) -> Option { None } - - /// A simple filtering rule that drops the activity if it matches the predicate `P`. - #[derive(Clone)] - pub struct Filter

(pub P) - where - P: Fn (&Activity) -> bool; - - impl

Rule for Filter

- where - P: Fn (&Activity) -> bool + Clone, - { - fn apply (&self, act: Activity) -> Option { - let Self (f) = self; - if f(&act) { - None - } else { - Some (act) - } - } - } - - // Combinators - - /// Sequence two rules. - /// - /// `B` will only be applied if `A` returns [`Some`], otherwise it - /// short-circuits. - #[derive(Clone)] - pub struct Then (A, B); - - impl Rule for Then - where - A: Rule, - B: Rule, - { - fn apply (&self, act: Activity) -> Option { - let Self (a, b) = self; - a.apply(act).and_then(|act| { - b.apply(act) - }) - } - } - - /// Apply a rule conditionally. - /// - /// If the predicate `P` returns `true`, apply `R`. Otherwise, return the - /// activity unmodified. - #[derive(Clone)] - pub struct Cond { - pred: P, - rule: R, - } - - impl Rule for Cond - where - P: Fn (&Activity) -> bool + Clone, - R: Rule, - { - fn apply (&self, act: Activity) -> Option { - let Self { pred, rule } = self; - if pred(&act) { - rule.apply(act) - } else { - Some (act) - } - } - } - - /// Execute a command and drop if nonzero exit code or empty stdout. - /// If the exit code is zero, stdout will be deserialized to an - /// [`Activity`]. - #[derive(Clone)] - pub struct Exec (std::path::PathBuf); - - impl Exec { - pub fn new (path: impl AsRef) -> Option { - todo!() - } - } - -} diff --git a/src/sign.rs b/src/sign.rs new file mode 100644 index 0000000..baa2152 --- /dev/null +++ b/src/sign.rs @@ -0,0 +1,51 @@ +//! Request signing through http signatures. + +use std::path::Path; + +use openssl::pkey::{PKey, Private}; + +use reqwest::Request; +use crate::Result; + +/// Something that has the capability to sign a [`Request`]. +pub trait Sign { + + /// Sign the request in accordance with the http-signatures standard. + fn sign (&self, req: &mut Request) -> Result<()>; + +} + +/// A private key with a key ID. +#[derive(Clone)] +pub struct Key { + /// The key ID. + url: crate::Id, + /// The actual private key. + key: PKey, +} + +impl Key { + + /// Load a PEM-encoded private key from a file on disk + pub fn load (url: crate::Id, path: impl AsRef) -> Result { + let s = std::fs::read_to_string(path).unwrap(); + + let bytes = s.as_bytes(); + let key = PKey::private_key_from_pem(&bytes) + .or_else(|_| PKey::private_key_from_pkcs8(&bytes)) + .or_else(|_| PKey::private_key_from_der(&bytes)) + .map_err(crate::err)?; + + Ok (Key { + url, + key, + }) + } + +} + +impl Sign for Key { + fn sign (&self, req: &mut Request) -> Result<()> { + todo!() + } +}