Initialize all the tasks in the main function

This commit is contained in:
Riley Apeldoorn 2022-06-18 22:50:29 +02:00
parent c6bdbfbc52
commit 7d392ed3fa
6 changed files with 205 additions and 41 deletions

2
Cargo.lock generated
View File

@ -485,6 +485,7 @@ dependencies = [
"serde_json",
"sqlx",
"tokio",
"tokio-stream",
"url",
]
@ -1429,6 +1430,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@ -15,3 +15,4 @@ axum = { version = '*', features = [ "ws", "serde_json" ] }
url = { version = '*', features = [ "serde" ] }
sqlx = { version = '*', features = [ "postgres", "runtime-tokio-native-tls" ] }
openssl = '*'
tokio-stream = { version = '*', features = [ "sync" ] }

View File

@ -85,11 +85,12 @@ impl Activity {
Ok (())
}
// Get all delivery targets as urls.
/// Get all delivery targets as urls.
async fn delivery_targets (&self) -> Result<Vec<reqwest::Url>> {
todo!()
}
/// Perform the activity.
pub async fn perform <S> (self, ctx: &mut Context<S>) -> Result<()> where S: sign::Sign {
todo!()
}

View File

@ -93,9 +93,19 @@ mod ctx {
use crate::{ conf::Config, db, Result, sign::Sign, ap, Activity };
/// The context of a thread/task.
///
/// The intended usage pattern is to create a single [`Context`] per
/// thread/async task and to propagate updates to the [`Config`] using
/// message-passing style between the tasks. The library provides no
/// such functionality. Live-reloading is implemented by the program
/// itself.
pub struct Context <S> {
/// The configuration.
pub config: Config,
/// The signing key used by actions running within this context.
pub signer: Arc<S>,
/// A handle to the database.
pub client: db::Client,
}
@ -111,6 +121,15 @@ mod ctx {
impl<S> Context<S> {
pub async fn dereference (&self, json: Value) -> Result<Activity>
where
S: Sign
{
self.dereferencer()
.dereference(json)
.await
}
/// Attempt an action within the context of the database.
pub async fn with_db <'a, F, O, T> (&'a mut self, f: F) -> Result<T>
where
@ -126,7 +145,7 @@ mod ctx {
}
/// Get a dereferencer.
pub fn dereferencer (&self) -> Dereferencer<S>
fn dereferencer (&self) -> Dereferencer<S>
where
S: Sign
{
@ -146,10 +165,6 @@ mod ctx {
&self.config
}
pub fn config_mut (&mut self) -> &mut Config {
&mut self.config
}
/// Conjure an activity "from thin air" as though it were posted through a client.
pub (crate) async fn conjure (&self, act: impl Into<Activity>) -> Result<()> {
let act = act.into();

View File

@ -1,9 +1,15 @@
use std::sync::Arc;
use futures::stream;
use hermit::{ Context, Error, db, sign, Activity, };
use hermit::conf::Config;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, broadcast};
use task::Executor;
use tokio_stream::wrappers::{ReceiverStream, BroadcastStream};
/// Module that contains all the API endpoints and frontend pages
/// used by Hermit.
mod web;
#[tokio::main]
async fn main () {
@ -34,9 +40,57 @@ async fn main () {
};
let (ap_tx, ap_rx) = mk_channel(256);
let (fe_tx, fe_rx) = mk_channel(256);
// Initialize the web server.
task::run(&ctx, task::Server {});
ctx.run (task::Server {
ap_tx,
fe_tx,
});
let (ctrl_tx, ctrl_rx) = mk_channel(256);
let (json_tx, json_rx) = mk_channel(256);
// Initialize the API preprocessor.
ctx.run (task::Api {
ctrl_tx: ctrl_tx.clone(),
json_tx,
fe_rx,
ap_rx,
});
let (auto_tx, auto_rx) = mk_channel(256);
// Initialize the task that captures IPC events.
ctx.run (task::Ipc {
auto_tx,
ctrl_tx,
});
let (ctrl_tx, _rx) = broadcast::channel(256);
ctx.run (task::Auto {
ctrl_rx: ctrl_tx.subscribe(),
auto_rx
});
ctx.run (task::Process {
ctrl_rx: ctrl_tx.subscribe(),
json_rx,
});
ctx.run (task::Ctrl {
rx: ctrl_rx,
tx: ctrl_tx,
})
}
fn mk_channel <T> (size: usize) -> (mpsc::Sender<T>, ReceiverStream<T>) {
let (tx, rx) = mpsc::channel(size);
let rx = ReceiverStream::new(rx);
(tx, rx)
}
fn err (e: impl Into<Error>) -> Error { e.into() }
@ -47,18 +101,29 @@ mod task {
//! streams and sinks.
use std::pin::Pin;
use tokio::sync::{mpsc, broadcast};
use futures::prelude::*;
use serde_json::Value;
use crate::web;
use crate::sign::Sign;
use crate::{flow::Flow, Activity, ctrl::Message, Context};
/// Perform a [`Task`].
pub fn run <S> (ctx: &Context<S>, task: impl Task)
impl<S> Executor for Context<S>
where
S: Sign + Send + Sync + 'static
{
let ctx: Context<S> = ctx.clone();
tokio::spawn(task.run(ctx));
fn run (&self, task: impl Task) {
let ctx: Context<S> = self.clone();
tokio::spawn(task.run(ctx));
}
}
/// Something that can execute a task.
pub trait Executor {
/// Perform a [`Task`].
fn run (&self, task: impl Task);
}
/// A computation running indefinitely on a separate thread.
@ -75,7 +140,12 @@ mod task {
}
/// The main web server.
pub struct Server {}
pub struct Server {
/// Transmitter for messages from the ActivityPub APIs.
pub ap_tx: mpsc::Sender<Value>,
/// Transmitter for messages from the frontend APIs.
pub fe_tx: mpsc::Sender<Value>,
}
impl Task for Server {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
@ -84,61 +154,127 @@ mod task {
where
S: Sign + Send + Sync + 'static
{
todo!()
use axum::Server;
let Self { ap_tx, fe_tx } = self;
let config = ctx.config;
Box::pin(async move {
let port = config.port;
let addr = &format!("0.0.0.0:{port}").parse().unwrap();
// Both the endpoints and the frontend (if enabled) are defined in
// the `web` module.
let service = web::service(config, ap_tx, fe_tx).into_make_service();
Server::bind(addr)
.serve(service)
.await
.unwrap()
})
}
}
/// API request event processing.
pub struct Api <F, A, C, P> {
pub struct Api <F, A> {
/// Input stream of API request events from the frontend endpoints.
pub fe_rx: F,
/// Input stream of API request events from the ActivityPub
/// endpoints.
pub ap_rx: A,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: C,
pub ctrl_tx: mpsc::Sender<Message>,
/// Output stream to the [Activity processor pipeline][Process].
pub pipe_tx: P,
pub json_tx: mpsc::Sender<Flow<Value>>,
}
impl<F, A> Task for Api<F, A>
where
F: Stream<Item = Value> + Unpin + Send + 'static,
A: Stream<Item = Value> + Unpin + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Processes CLI commands and sends them to either the [`Auto`] task (which
/// takes care of scheduling automated maintenance tasks) or the [`Ctrl`] task,
/// which propagates control messages through the system, like live config
/// updates or shutdown messages for example.
pub struct Ipc <A, C> {
pub struct Ipc {
/// Output stream to the [`Auto`] task.
pub auto_tx: A,
pub auto_tx: mpsc::Sender<()>,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: C,
pub ctrl_tx: mpsc::Sender<Message>,
}
impl Task for Ipc {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Delivers control messages to other running tasks.
pub struct Ctrl <A, I, S> {
pub struct Ctrl <I> {
/// Message stream from the [`Api`] task.
pub api_rx: A,
/// Message stream from the [`Ipc`] task.
pub ipc_rx: I,
pub rx: I,
/// Fan-out to all running tasks that are subscribed to [control messages][Ctrl].
pub tx: S,
pub tx: broadcast::Sender<Message>,
}
impl<I> Task for Ctrl<I> {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Performs automated maintenance tasks.
pub struct Auto <E, C> {
pub struct Auto <E> {
/// Receiver for manual job triggers received from the [`Ipc`] task.
pub ipc_rx: E,
pub auto_rx: E,
/// Receiver for [control messages][Ctrl].
pub ctrl_rx: C,
pub ctrl_rx: broadcast::Receiver<Message>,
}
pub struct Process <D, C> {
pub data_rx: D,
pub ctrl_rx: C,
impl<E> Task for Auto<E> {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Processes incoming [`Activity`] objects.
pub struct Process <J> {
pub json_rx: J,
/// Receiver for [control messages][Ctrl].
pub ctrl_rx: broadcast::Receiver<Message>,
}
impl<D, C> Task for Process<D, C>
impl<J> Task for Process<J>
where
D: Stream<Item = Flow<Value>> + Unpin + Send + 'static,
C: Stream<Item = Message> + Unpin + Send + 'static,
J: Stream<Item = Flow<Value>> + Unpin + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
@ -147,25 +283,24 @@ mod task {
S: Sign + Send + Sync + 'static
{
let Self { mut data_rx, mut ctrl_rx } = self;
let Self { mut json_rx, mut ctrl_rx } = self;
Box::pin(async move {
loop {
tokio::select! {
// Await control commands from `Ctrl`.
Some (message) = ctrl_rx.next() => match message {
Ok (message) = ctrl_rx.recv() => match message {
// Live config reloading.
Message::Reconfigure (c) => c(&mut ctx.config),
// Graceful termination command from `Ctrl`.
Message::Terminate => break,
},
// Listen for incoming activities.
Some (data) = data_rx.next() => {
Some (data) = json_rx.next() => {
// Dereferencing and other unfucking.
let d = ctx.dereferencer();
let data = match data.apply(|j| d.dereference(j)).await {
let data = match data.apply(|j| ctx.dereference(j)).await {
Ok (data) => data,
Err (err) => {
// If dereferencing fails, that sucks but it's not

10
src/web.rs Normal file
View File

@ -0,0 +1,10 @@
use axum::Router;
use serde_json::Value;
use tokio::sync::mpsc::Sender;
use crate::Config;
/// Create a new web service.
pub fn service (config: Config, ap_tx: Sender<Value>, fe_tx: Sender<Value>) -> Router {
Router::new()
}