hermit/src/main.rs

345 lines
9.0 KiB
Rust

use std::sync::Arc;
use hermit::{ Context, Error, db, sign, Activity, };
use hermit::conf::Config;
use tokio::sync::RwLock;
#[tokio::main]
async fn main () {
// Set up the context for each task.
let ctx = {
// The hostname this server will be hosted under (hardcoded for now)
let hostname = "dev.riley.lgbt";
// Establish a connection to the database.
let client = db::Client::new(db::Config {}).await.unwrap();
// Generate the config from the hostname.
let config = Config::new(&hostname);
// Use an instance-wide signing key (for now).
let signer = sign::Key::load(
format!("https://{hostname}/key/main").parse().unwrap(),
"private_key.pem"
).map(Arc::new).unwrap();
Context {
signer,
config,
client,
}
};
// Initialize the web server.
task::run(&ctx, task::Server {});
}
fn err (e: impl Into<Error>) -> Error { e.into() }
mod task {
//! Async tasks, communicating with each other across threads through generic
//! streams and sinks.
use std::pin::Pin;
use futures::prelude::*;
use serde_json::Value;
use crate::sign::Sign;
use crate::{flow::Flow, Activity, ctrl::Message, Context};
/// Perform a [`Task`].
pub fn run <S> (ctx: &Context<S>, task: impl Task)
where
S: Sign + Send + Sync + 'static
{
let ctx: Context<S> = ctx.clone();
tokio::spawn(task.run(ctx));
}
/// A computation running indefinitely on a separate thread.
pub trait Task {
/// The future representing this computation.
type Future: Future<Output = ()> + Send + 'static;
/// Execute the task.
fn run <S> (self, ctx: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static;
}
/// The main web server.
pub struct Server {}
impl Task for Server {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, ctx: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// API request event processing.
pub struct Api <F, A, C, P> {
/// Input stream of API request events from the frontend endpoints.
pub fe_rx: F,
/// Input stream of API request events from the ActivityPub
/// endpoints.
pub ap_rx: A,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: C,
/// Output stream to the [Activity processor pipeline][Process].
pub pipe_tx: P,
}
/// Processes CLI commands and sends them to either the [`Auto`] task (which
/// takes care of scheduling automated maintenance tasks) or the [`Ctrl`] task,
/// which propagates control messages through the system, like live config
/// updates or shutdown messages for example.
pub struct Ipc <A, C> {
/// Output stream to the [`Auto`] task.
pub auto_tx: A,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: C,
}
/// Delivers control messages to other running tasks.
pub struct Ctrl <A, I, S> {
/// Message stream from the [`Api`] task.
pub api_rx: A,
/// Message stream from the [`Ipc`] task.
pub ipc_rx: I,
/// Fan-out to all running tasks that are subscribed to [control messages][Ctrl].
pub tx: S,
}
/// Performs automated maintenance tasks.
pub struct Auto <E, C> {
/// Receiver for manual job triggers received from the [`Ipc`] task.
pub ipc_rx: E,
/// Receiver for [control messages][Ctrl].
pub ctrl_rx: C,
}
pub struct Process <D, C> {
pub data_rx: D,
pub ctrl_rx: C,
}
impl<D, C> Task for Process<D, C>
where
D: Stream<Item = Flow<Value>> + Unpin + Send + 'static,
C: Stream<Item = Message> + Unpin + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, mut ctx: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
let Self { mut data_rx, mut ctrl_rx } = self;
Box::pin(async move {
loop {
tokio::select! {
// Await control commands from `Ctrl`.
Some (message) = ctrl_rx.next() => match message {
// Live config reloading.
Message::Reconfigure (c) => c(&mut ctx.config),
// Graceful termination command from `Ctrl`.
Message::Terminate => break,
},
// Listen for incoming activities.
Some (data) = data_rx.next() => {
// Dereferencing and other unfucking.
let d = ctx.dereferencer();
let data = match data.apply(|j| d.dereference(j)).await {
Ok (data) => data,
Err (err) => {
// If dereferencing fails, that sucks but it's not
// fatal, so we drop the activity entirely.
println!("Fixup | Dropped due to '{:?}'", err);
continue
},
};
// Run both incoming and outgoing activities through the filtering system.
let action = |act| ctx.config.rules.iter().try_fold(act, |a, r| r.apply(a));
let data = match data.map(action).to_option() {
// Activity survived the filtering process, bind it to `data`.
Some (data) => data,
// Activity got filtered out, move on.
None => continue,
};
// Perform each activity in the context of the instance.
if let Err (err) = data.clone().apply(|a| a.perform(&mut ctx)).await {
// Something went wrong while performing the activity,
// report error and move on.
println!("Exec | Failure '{:?}'", err);
continue
};
// Push each activity to an appropriate location.
// If incoming: push a notification to the frontend.
let incoming = {
move |_: Activity| async { todo!() }
};
// If outgoing: deliver the activity to its targets using
// the ActivityPub delivery mechanism.
let outgoing = {
let s = ctx.signer();
move |a: Activity| a.deliver(s)
};
// Apply the appropriate functions to "push" the activity.
if let Err (err) = data.pick(incoming, outgoing).await {
// Neither of these failing should be considered
// fatal, but if it happens too much, it could be
// an indication of something being borked.
println!("Push | Failure '{:?}'", err);
continue
};
},
}
}
})
}
}
}
pub mod flow {
//! Functional control flow based on the source and destination
//! of a message flowing through the system.
use std::future::Future;
/// A wrapper type that annotates a message with the flow it is
/// supposed to take, without allowing that flow to be inspected
/// or modified.
#[derive(Clone)]
pub struct Flow <T> {
flow: Direction,
data: T,
}
#[derive(Clone, Copy)]
enum Direction {
Incoming,
Outgoing,
}
impl<T> Flow<T> {
#[allow(non_snake_case)]
/// Make the data take the "incoming" flow.
pub fn Incoming (data: T) -> Flow<T> {
Flow { data, flow: Direction::Incoming }
}
#[allow(non_snake_case)]
/// Make the data take the "outbound" flow.
pub fn Outgoing (data: T) -> Flow<T> {
Flow { data, flow: Direction::Outgoing }
}
/// Apply a function `f` to the value inside, without disturbing
/// the flow direction.
pub async fn apply <F, A, U, E> (self, f: F) -> Result<Flow<U>, E>
where
A: Future<Output = Result<U, E>>,
F: FnOnce (T) -> A,
{
let Flow { data, flow } = self;
Ok (Flow {
data: f(data).await?,
flow,
})
}
/// If the message is taking the incoming flow, apply `f`, if it is taking the
/// outgoing flow, apply `g`.
pub async fn pick <F, G, A, B, U, E> (self, f: F, g: G) -> Result<Flow<U>, E>
where
A: Future<Output = Result<U, E>>,
B: Future<Output = Result<U, E>>,
F: FnOnce (T) -> A,
G: FnOnce (T) -> B,
{
match self.flow {
Direction::Incoming => self.apply(f).await,
Direction::Outgoing => self.apply(g).await,
}
}
/// Map over the contained value.
pub fn map <F, U> (self, f: F) -> Flow<U>
where
F: FnOnce (T) -> U,
{
Flow {
data: f(self.data),
flow: self.flow,
}
}
}
impl<T> Flow<Option<T>> {
/// Swap the containers.
pub fn to_option (self) -> Option<Flow<T>> {
let Flow { flow, data } = self;
data.map(|data| Flow {
flow,
data,
})
}
}
impl<T, E> Flow<Result<T, E>> {
/// Swap the containers.
pub fn to_result (self) -> Result<Flow<T>, E> {
let Flow { flow, data } = self;
data.map(|data| Flow {
flow,
data,
})
}
}
}
/// Control messages.
pub mod ctrl {
use std::sync::Arc;
use hermit::conf::Config;
#[derive(Clone)]
pub enum Message {
/// Modify the existing configuration of each task.
Reconfigure (Arc<Box<dyn Fn (&mut Config) + Send + Sync>>),
/// Shut down everything.
Terminate,
}
}