hermit/src/main.rs

488 lines
12 KiB
Rust

use std::sync::Arc;
use hermit::{ Context, Error, db, sign, Activity, };
use hermit::conf::Config;
use tokio::sync::{mpsc, broadcast};
use task::Executor;
use tokio_stream::wrappers::ReceiverStream;
/// Module that contains all the API endpoints and frontend pages
/// used by Hermit.
mod web;
#[tokio::main]
async fn main () {
// Set up the context for each task.
let ctx = {
// The hostname this server will be hosted under (hardcoded for now)
let hostname = "dev.riley.lgbt";
// Establish a connection to the database.
let client = db::Client::new(db::Config {}).await.unwrap();
// Generate the config from the hostname.
let config = Config::new(&hostname);
// Use an instance-wide signing key (for now).
let signer = sign::Key::load(
format!("https://{hostname}/key/main").parse().unwrap(),
"private_key.pem"
).map(Arc::new).unwrap();
Context {
signer,
config,
client,
}
};
let (ap_tx, ap_rx) = mk_channel(256);
let (fe_tx, fe_rx) = mk_channel(256);
// Initialize the web server.
ctx.run (task::Server {
ap_tx,
fe_tx,
});
let (ctrl_tx, ctrl_rx) = mk_channel(256);
let (json_tx, json_rx) = mk_channel(256);
// Initialize the API preprocessor.
ctx.run (task::Api {
ctrl_tx: ctrl_tx.clone(),
json_tx,
fe_rx,
ap_rx,
});
let (auto_tx, auto_rx) = mk_channel(256);
// Initialize the task that captures IPC events.
ctx.run (task::Ipc {
auto_tx,
ctrl_tx,
});
// Redefine `ctrl_tx`: it is now the transmitter that transmits
// *from* `Ctrl`.
let (ctrl_tx, _rx) = broadcast::channel(256);
ctx.run (task::Auto {
ctrl_rx: ctrl_tx.subscribe(),
auto_rx
});
ctx.run (task::Process {
ctrl_rx: ctrl_tx.subscribe(),
json_rx,
});
ctx.run (task::Ctrl {
rx: ctrl_rx,
tx: ctrl_tx,
})
}
fn mk_channel <T> (size: usize) -> (mpsc::Sender<T>, ReceiverStream<T>) {
let (tx, rx) = mpsc::channel(size);
let rx = ReceiverStream::new(rx);
(tx, rx)
}
mod task {
//! Async tasks, communicating with each other across threads through generic
//! streams and sinks.
use std::pin::Pin;
use tokio::sync::{
broadcast,
mpsc,
};
use futures::prelude::*;
use serde_json::Value;
use crate::{
flow::Flow,
ctrl::Message,
sign::Sign,
Activity,
Context,
web,
};
/// Something that can execute a [`Task`].
pub trait Executor {
/// Perform a [`Task`].
fn run (&self, task: impl Task);
}
/// A computation running indefinitely on a separate thread.
pub trait Task {
/// The future representing this computation.
type Future: Future<Output = ()> + Send + 'static;
/// Execute the task.
fn run <S> (self, ctx: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static;
}
impl<S> Executor for Context<S>
where
S: Sign + Send + Sync + 'static
{
fn run (&self, task: impl Task) {
let ctx: Context<S> = self.clone();
tokio::spawn(task.run(ctx));
}
}
/// The main web server.
pub struct Server {
/// Transmitter for messages from the ActivityPub APIs.
pub ap_tx: mpsc::Sender<Value>,
/// Transmitter for messages from the frontend APIs.
pub fe_tx: mpsc::Sender<Value>,
}
impl Task for Server {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, ctx: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
use axum::Server;
let Self { ap_tx, fe_tx } = self;
let config = ctx.config;
Box::pin(async move {
let port = config.port;
let addr = &format!("0.0.0.0:{port}").parse().unwrap();
// Both the endpoints and the frontend (if enabled) are defined in
// the `web` module.
let service = web::service(config, ap_tx, fe_tx).into_make_service();
Server::bind(addr)
.serve(service)
.await
.unwrap()
})
}
}
/// API request event processing.
pub struct Api <F, A> {
/// Input stream of API request events from the frontend endpoints.
pub fe_rx: F,
/// Input stream of API request events from the ActivityPub
/// endpoints.
pub ap_rx: A,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: mpsc::Sender<Message>,
/// Output stream to the [Activity processor pipeline][Process].
pub json_tx: mpsc::Sender<Flow<Value>>,
}
impl<F, A> Task for Api<F, A>
where
F: Stream<Item = Value> + Unpin + Send + 'static,
A: Stream<Item = Value> + Unpin + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Processes CLI commands and sends them to either the [`Auto`] task (which
/// takes care of scheduling automated maintenance tasks) or the [`Ctrl`] task,
/// which propagates control messages through the system, like live config
/// updates or shutdown messages for example.
pub struct Ipc {
/// Output stream to the [`Auto`] task.
pub auto_tx: mpsc::Sender<()>,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: mpsc::Sender<Message>,
}
impl Task for Ipc {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Delivers control messages to other running tasks.
pub struct Ctrl <I> {
/// Message stream from the [`Api`] task.
pub rx: I,
/// Fan-out to all running tasks that are subscribed to [control messages][Ctrl].
pub tx: broadcast::Sender<Message>,
}
impl<I> Task for Ctrl<I> {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Performs automated maintenance tasks.
pub struct Auto <E> {
/// Receiver for manual job triggers received from the [`Ipc`] task.
pub auto_rx: E,
/// Receiver for [control messages][Ctrl].
pub ctrl_rx: broadcast::Receiver<Message>,
}
impl<E> Task for Auto<E> {
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, _: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
todo!()
}
}
/// Processes incoming [`Activity`] objects.
pub struct Process <J> {
pub json_rx: J,
/// Receiver for [control messages][Ctrl].
pub ctrl_rx: broadcast::Receiver<Message>,
}
impl<J> Task for Process<J>
where
J: Stream<Item = Flow<Value>> + Unpin + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, mut ctx: Context<S>) -> Self::Future
where
S: Sign + Send + Sync + 'static
{
let Self { mut json_rx, mut ctrl_rx } = self;
Box::pin(async move {
loop {
tokio::select! {
// Await control commands from `Ctrl`.
Ok (message) = ctrl_rx.recv() => match message {
// Live config reloading.
Message::Reconfigure (c) => c(&mut ctx.config),
// Graceful termination command from `Ctrl`.
Message::Terminate => break,
},
// Listen for incoming activities.
Some (data) = json_rx.next() => {
// Dereferencing and other unfucking.
let data = match data.apply(|j| ctx.dereference(j)).await {
Ok (data) => data,
Err (err) => {
// If dereferencing fails, that sucks but it's not
// fatal, so we drop the activity entirely.
println!("Fixup | Dropped due to '{:?}'", err);
continue
},
};
// Run both incoming and outgoing activities through the filtering system.
let action = |act| ctx.config.rules.iter().try_fold(act, |a, r| r.apply(a));
let data = match data.map(action).to_option() {
// Activity survived the filtering process, bind it to `data`.
Some (data) => data,
// Activity got filtered out, move on.
None => continue,
};
// Perform each activity in the context of the instance.
if let Err (err) = data.clone().apply(|a| a.perform(&mut ctx)).await {
// Something went wrong while performing the activity,
// report error and move on.
println!("Exec | Failure '{:?}'", err);
continue
};
// Push each activity to an appropriate location.
// If incoming: push a notification to the frontend.
let incoming = {
move |_: Activity| async { todo!() }
};
// If outgoing: deliver the activity to its targets using
// the ActivityPub delivery mechanism.
let outgoing = {
let s = ctx.signer();
move |a: Activity| a.deliver(s)
};
// Apply the appropriate functions to "push" the activity.
if let Err (err) = data.pick(incoming, outgoing).await {
// Neither of these failing should be considered
// fatal, but if it happens too much, it could be
// an indication of something being borked.
println!("Push | Failure '{:?}'", err);
continue
};
},
}
}
})
}
}
}
pub mod flow {
//! Functional control flow based on the source and destination
//! of a message flowing through the system.
use std::future::Future;
/// A wrapper type that annotates a message with the flow it is
/// supposed to take, without allowing that flow to be inspected
/// or modified.
#[derive(Clone)]
pub struct Flow <T> {
flow: Direction,
data: T,
}
#[derive(Clone, Copy)]
enum Direction {
Incoming,
Outgoing,
}
impl<T> Flow<T> {
#[allow(non_snake_case)]
/// Make the data take the "incoming" flow.
pub fn Incoming (data: T) -> Flow<T> {
Flow { data, flow: Direction::Incoming }
}
#[allow(non_snake_case)]
/// Make the data take the "outbound" flow.
pub fn Outgoing (data: T) -> Flow<T> {
Flow { data, flow: Direction::Outgoing }
}
/// Apply a function `f` to the value inside, without disturbing
/// the flow direction.
pub async fn apply <F, A, U, E> (self, f: F) -> Result<Flow<U>, E>
where
A: Future<Output = Result<U, E>>,
F: FnOnce (T) -> A,
{
let Flow { data, flow } = self;
Ok (Flow {
data: f(data).await?,
flow,
})
}
/// If the message is taking the incoming flow, apply `f`, if it is taking the
/// outgoing flow, apply `g`.
pub async fn pick <F, G, A, B, U, E> (self, f: F, g: G) -> Result<Flow<U>, E>
where
A: Future<Output = Result<U, E>>,
B: Future<Output = Result<U, E>>,
F: FnOnce (T) -> A,
G: FnOnce (T) -> B,
{
match self.flow {
Direction::Incoming => self.apply(f).await,
Direction::Outgoing => self.apply(g).await,
}
}
/// Map over the contained value.
pub fn map <F, U> (self, f: F) -> Flow<U>
where
F: FnOnce (T) -> U,
{
Flow {
data: f(self.data),
flow: self.flow,
}
}
}
impl<T> Flow<Option<T>> {
/// Swap the containers.
pub fn to_option (self) -> Option<Flow<T>> {
let Flow { flow, data } = self;
data.map(|data| Flow {
flow,
data,
})
}
}
impl<T, E> Flow<Result<T, E>> {
/// Swap the containers.
pub fn to_result (self) -> Result<Flow<T>, E> {
let Flow { flow, data } = self;
data.map(|data| Flow {
flow,
data,
})
}
}
}
/// Control messages.
pub mod ctrl {
use std::sync::Arc;
use hermit::conf::Config;
#[derive(Clone)]
pub enum Message {
/// Modify the existing configuration of each task.
Reconfigure (Arc<Box<dyn Fn (&mut Config) + Send + Sync>>),
/// Shut down everything.
Terminate,
}
}