From 7d392ed3fa14a5e656af94d21b0ff9c65fab35dd Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Sat, 18 Jun 2022 22:50:29 +0200 Subject: [PATCH] Initialize all the tasks in the main function --- Cargo.lock | 2 + Cargo.toml | 1 + src/ap/mod.rs | 3 +- src/lib.rs | 25 ++++-- src/main.rs | 205 +++++++++++++++++++++++++++++++++++++++++--------- src/web.rs | 10 +++ 6 files changed, 205 insertions(+), 41 deletions(-) create mode 100644 src/web.rs diff --git a/Cargo.lock b/Cargo.lock index 78b1d3c..1c3b19f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,6 +485,7 @@ dependencies = [ "serde_json", "sqlx", "tokio", + "tokio-stream", "url", ] @@ -1429,6 +1430,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 24cd36b..fc22342 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,4 @@ axum = { version = '*', features = [ "ws", "serde_json" ] } url = { version = '*', features = [ "serde" ] } sqlx = { version = '*', features = [ "postgres", "runtime-tokio-native-tls" ] } openssl = '*' +tokio-stream = { version = '*', features = [ "sync" ] } diff --git a/src/ap/mod.rs b/src/ap/mod.rs index 2027a28..662200a 100644 --- a/src/ap/mod.rs +++ b/src/ap/mod.rs @@ -85,11 +85,12 @@ impl Activity { Ok (()) } - // Get all delivery targets as urls. + /// Get all delivery targets as urls. async fn delivery_targets (&self) -> Result> { todo!() } + /// Perform the activity. pub async fn perform (self, ctx: &mut Context) -> Result<()> where S: sign::Sign { todo!() } diff --git a/src/lib.rs b/src/lib.rs index a275892..e9c3cfa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,9 +93,19 @@ mod ctx { use crate::{ conf::Config, db, Result, sign::Sign, ap, Activity }; + /// The context of a thread/task. + /// + /// The intended usage pattern is to create a single [`Context`] per + /// thread/async task and to propagate updates to the [`Config`] using + /// message-passing style between the tasks. The library provides no + /// such functionality. Live-reloading is implemented by the program + /// itself. pub struct Context { + /// The configuration. pub config: Config, + /// The signing key used by actions running within this context. pub signer: Arc, + /// A handle to the database. pub client: db::Client, } @@ -111,6 +121,15 @@ mod ctx { impl Context { + pub async fn dereference (&self, json: Value) -> Result + where + S: Sign + { + self.dereferencer() + .dereference(json) + .await + } + /// Attempt an action within the context of the database. pub async fn with_db <'a, F, O, T> (&'a mut self, f: F) -> Result where @@ -126,7 +145,7 @@ mod ctx { } /// Get a dereferencer. - pub fn dereferencer (&self) -> Dereferencer + fn dereferencer (&self) -> Dereferencer where S: Sign { @@ -146,10 +165,6 @@ mod ctx { &self.config } - pub fn config_mut (&mut self) -> &mut Config { - &mut self.config - } - /// Conjure an activity "from thin air" as though it were posted through a client. pub (crate) async fn conjure (&self, act: impl Into) -> Result<()> { let act = act.into(); diff --git a/src/main.rs b/src/main.rs index 584ecd7..24f0de7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,15 @@ use std::sync::Arc; +use futures::stream; use hermit::{ Context, Error, db, sign, Activity, }; - use hermit::conf::Config; -use tokio::sync::RwLock; +use tokio::sync::{mpsc, broadcast}; +use task::Executor; +use tokio_stream::wrappers::{ReceiverStream, BroadcastStream}; + +/// Module that contains all the API endpoints and frontend pages +/// used by Hermit. +mod web; #[tokio::main] async fn main () { @@ -34,9 +40,57 @@ async fn main () { }; + let (ap_tx, ap_rx) = mk_channel(256); + let (fe_tx, fe_rx) = mk_channel(256); + // Initialize the web server. - task::run(&ctx, task::Server {}); - + ctx.run (task::Server { + ap_tx, + fe_tx, + }); + + let (ctrl_tx, ctrl_rx) = mk_channel(256); + let (json_tx, json_rx) = mk_channel(256); + + // Initialize the API preprocessor. + ctx.run (task::Api { + ctrl_tx: ctrl_tx.clone(), + json_tx, + fe_rx, + ap_rx, + }); + + let (auto_tx, auto_rx) = mk_channel(256); + + // Initialize the task that captures IPC events. + ctx.run (task::Ipc { + auto_tx, + ctrl_tx, + }); + + let (ctrl_tx, _rx) = broadcast::channel(256); + + ctx.run (task::Auto { + ctrl_rx: ctrl_tx.subscribe(), + auto_rx + }); + + ctx.run (task::Process { + ctrl_rx: ctrl_tx.subscribe(), + json_rx, + }); + + ctx.run (task::Ctrl { + rx: ctrl_rx, + tx: ctrl_tx, + }) + +} + +fn mk_channel (size: usize) -> (mpsc::Sender, ReceiverStream) { + let (tx, rx) = mpsc::channel(size); + let rx = ReceiverStream::new(rx); + (tx, rx) } fn err (e: impl Into) -> Error { e.into() } @@ -47,18 +101,29 @@ mod task { //! streams and sinks. use std::pin::Pin; + use tokio::sync::{mpsc, broadcast}; use futures::prelude::*; use serde_json::Value; + use crate::web; use crate::sign::Sign; use crate::{flow::Flow, Activity, ctrl::Message, Context}; - /// Perform a [`Task`]. - pub fn run (ctx: &Context, task: impl Task) + impl Executor for Context where S: Sign + Send + Sync + 'static { - let ctx: Context = ctx.clone(); - tokio::spawn(task.run(ctx)); + fn run (&self, task: impl Task) { + let ctx: Context = self.clone(); + tokio::spawn(task.run(ctx)); + } + } + + /// Something that can execute a task. + pub trait Executor { + + /// Perform a [`Task`]. + fn run (&self, task: impl Task); + } /// A computation running indefinitely on a separate thread. @@ -75,7 +140,12 @@ mod task { } /// The main web server. - pub struct Server {} + pub struct Server { + /// Transmitter for messages from the ActivityPub APIs. + pub ap_tx: mpsc::Sender, + /// Transmitter for messages from the frontend APIs. + pub fe_tx: mpsc::Sender, + } impl Task for Server { type Future = Pin + Send + 'static>>; @@ -84,61 +154,127 @@ mod task { where S: Sign + Send + Sync + 'static { - todo!() + use axum::Server; + + let Self { ap_tx, fe_tx } = self; + + let config = ctx.config; + + Box::pin(async move { + let port = config.port; + let addr = &format!("0.0.0.0:{port}").parse().unwrap(); + + // Both the endpoints and the frontend (if enabled) are defined in + // the `web` module. + let service = web::service(config, ap_tx, fe_tx).into_make_service(); + + Server::bind(addr) + .serve(service) + .await + .unwrap() + }) } } /// API request event processing. - pub struct Api { + pub struct Api { /// Input stream of API request events from the frontend endpoints. pub fe_rx: F, /// Input stream of API request events from the ActivityPub /// endpoints. pub ap_rx: A, /// Output stream to the [`Ctrl`] task. - pub ctrl_tx: C, + pub ctrl_tx: mpsc::Sender, /// Output stream to the [Activity processor pipeline][Process]. - pub pipe_tx: P, + pub json_tx: mpsc::Sender>, } + impl Task for Api + where + F: Stream + Unpin + Send + 'static, + A: Stream + Unpin + Send + 'static, + { + type Future = Pin + Send + 'static>>; + + fn run (self, _: Context) -> Self::Future + where + S: Sign + Send + Sync + 'static + { + todo!() + } + } + /// Processes CLI commands and sends them to either the [`Auto`] task (which /// takes care of scheduling automated maintenance tasks) or the [`Ctrl`] task, /// which propagates control messages through the system, like live config /// updates or shutdown messages for example. - pub struct Ipc { + pub struct Ipc { /// Output stream to the [`Auto`] task. - pub auto_tx: A, + pub auto_tx: mpsc::Sender<()>, /// Output stream to the [`Ctrl`] task. - pub ctrl_tx: C, + pub ctrl_tx: mpsc::Sender, } + impl Task for Ipc { + type Future = Pin + Send + 'static>>; + + fn run (self, _: Context) -> Self::Future + where + S: Sign + Send + Sync + 'static + { + todo!() + } + } + /// Delivers control messages to other running tasks. - pub struct Ctrl { + pub struct Ctrl { /// Message stream from the [`Api`] task. - pub api_rx: A, - /// Message stream from the [`Ipc`] task. - pub ipc_rx: I, + pub rx: I, /// Fan-out to all running tasks that are subscribed to [control messages][Ctrl]. - pub tx: S, + pub tx: broadcast::Sender, } + impl Task for Ctrl { + type Future = Pin + Send + 'static>>; + + fn run (self, _: Context) -> Self::Future + where + S: Sign + Send + Sync + 'static + { + todo!() + } + } + + /// Performs automated maintenance tasks. - pub struct Auto { + pub struct Auto { /// Receiver for manual job triggers received from the [`Ipc`] task. - pub ipc_rx: E, + pub auto_rx: E, /// Receiver for [control messages][Ctrl]. - pub ctrl_rx: C, + pub ctrl_rx: broadcast::Receiver, } - pub struct Process { - pub data_rx: D, - pub ctrl_rx: C, + impl Task for Auto { + type Future = Pin + Send + 'static>>; + + fn run (self, _: Context) -> Self::Future + where + S: Sign + Send + Sync + 'static + { + todo!() + } + } + + /// Processes incoming [`Activity`] objects. + pub struct Process { + pub json_rx: J, + /// Receiver for [control messages][Ctrl]. + pub ctrl_rx: broadcast::Receiver, } - impl Task for Process + impl Task for Process where - D: Stream> + Unpin + Send + 'static, - C: Stream + Unpin + Send + 'static, + J: Stream> + Unpin + Send + 'static, { type Future = Pin + Send + 'static>>; @@ -147,25 +283,24 @@ mod task { S: Sign + Send + Sync + 'static { - let Self { mut data_rx, mut ctrl_rx } = self; + let Self { mut json_rx, mut ctrl_rx } = self; Box::pin(async move { loop { tokio::select! { // Await control commands from `Ctrl`. - Some (message) = ctrl_rx.next() => match message { + Ok (message) = ctrl_rx.recv() => match message { // Live config reloading. Message::Reconfigure (c) => c(&mut ctx.config), // Graceful termination command from `Ctrl`. Message::Terminate => break, }, // Listen for incoming activities. - Some (data) = data_rx.next() => { + Some (data) = json_rx.next() => { // Dereferencing and other unfucking. - let d = ctx.dereferencer(); - let data = match data.apply(|j| d.dereference(j)).await { + let data = match data.apply(|j| ctx.dereference(j)).await { Ok (data) => data, Err (err) => { // If dereferencing fails, that sucks but it's not diff --git a/src/web.rs b/src/web.rs new file mode 100644 index 0000000..79a7434 --- /dev/null +++ b/src/web.rs @@ -0,0 +1,10 @@ +use axum::Router; +use serde_json::Value; +use tokio::sync::mpsc::Sender; + +use crate::Config; + +/// Create a new web service. +pub fn service (config: Config, ap_tx: Sender, fe_tx: Sender) -> Router { + Router::new() +}