hermit/src/main.rs

1009 lines
24 KiB
Rust

pub use id::Id;
use serde_json::{from_value, Value};
use futures::prelude::*;
use sign::Sign;
use conf::Config;
#[tokio::main]
async fn main () {
let cfg = Config::new("hmt.riley.lgbt");
let ctx = Context {
config: cfg,
signer: todo!(),
};
}
mod task {
//! Async tasks, communicating with each other across threads through generic
//! streams and sinks.
use std::pin::Pin;
use futures::prelude::*;
use serde_json::Value;
use crate::{sign::Sign, flow::Flow, Activity, ctrl::Message, Context};
/// Perform a [`Task`].
pub fn run <S> (ctx: &Context<S>, task: impl Task)
where
S: Sign + Clone + Send + Sync + 'static
{
let ctx = ctx.clone();
tokio::spawn(task.run(ctx));
}
/// A computation running indefinitely on a separate thread.
pub trait Task {
/// The future representing this computation.
type Future: Future<Output = ()> + Send + 'static;
/// Execute the task.
fn run <S> (self, ctx: Context<S>) -> Self::Future
where
S: Sign + Clone + Send + Sync + 'static;
}
/// API request event processing.
pub struct Api <F, A, C, P> {
/// Input stream of API request events from the frontend endpoints.
pub fe_rx: F,
/// Input stream of API request events from the ActivityPub
/// endpoints.
pub ap_rx: A,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: C,
/// Output stream to the [Activity processor pipeline][Process].
pub pipe_tx: P,
}
/// Processes CLI commands and sends them to either the [`Auto`] task (which
/// takes care of scheduling automated maintenance tasks) or the [`Ctrl`] task,
/// which propagates control messages through the system, like live config
/// updates or shutdown messages for example.
pub struct Ipc <A, C> {
/// Output stream to the [`Auto`] task.
pub auto_tx: A,
/// Output stream to the [`Ctrl`] task.
pub ctrl_tx: C,
}
/// Delivers control messages to other running tasks.
pub struct Ctrl <A, I, S> {
/// Message stream from the [`Api`] task.
pub api_rx: A,
/// Message stream from the [`Ipc`] task.
pub ipc_rx: I,
/// Fan-out to all running tasks that are subscribed to [control messages][Ctrl].
pub tx: S,
}
/// Performs automated maintenance tasks.
pub struct Auto <E, C> {
/// Receiver for manual job triggers received from the [`Ipc`] task.
pub ipc_rx: E,
/// Receiver for [control messages][Ctrl].
pub ctrl_rx: C,
}
pub struct Process <D, C> {
pub data_rx: D,
pub ctrl_rx: C,
}
impl<D, C> Task for Process<D, C>
where
D: Stream<Item = Flow<Value>> + Unpin + Send + 'static,
C: Stream<Item = Message> + Unpin + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run <S> (self, ctx: Context<S>) -> Self::Future
where
S: Sign + Clone + Send + Sync + 'static
{
let Self { mut data_rx, mut ctrl_rx } = self;
Box::pin(async move {
let mut config = crate::conf::Config::new("localhost");
loop {
tokio::select! {
// Await control commands from `Ctrl`.
Some (message) = ctrl_rx.next() => match message {
// Live config reloading.
Message::Reconfigure (c) => c(&mut config),
// Graceful termination command from `Ctrl`.
Message::Terminate => break,
},
// Listen for incoming activities.
Some (data) = data_rx.next() => {
// Dereferencing and other unfucking.
let d = ctx.dereferencer();
let data = match data.apply(|j| d.dereference(j)).await {
Ok (data) => data,
Err (err) => {
// If dereferencing fails, that sucks but it's not
// fatal, so we drop the activity entirely.
println!("Fixup | Dropped due to '{:?}'", err);
continue
},
};
// Run both incoming and outgoing activities through the filtering system.
let action = |act| config.rules.iter().try_fold(act, |a, r| r.apply(a));
let data = match data.map(action).to_option() {
// Activity survived the filtering process, bind it to `data`.
Some (data) => data,
// Activity got filtered out, move on.
None => continue,
};
// Perform each activity in the context of the instance.
let c = ctx.clone();
if let Err (err) = data.clone().apply(|a| a.perform(c)).await {
// Something went wrong while performing the activity,
// report error and move on.
println!("Exec | Failure '{:?}'", err);
continue
};
// Push each activity to an appropriate location.
// If incoming: push a notification to the frontend.
let incoming = {
let n = ctx.notifier();
move |a: Activity| a.notify(n)
};
// If outgoing: deliver the activity to its targets using
// the ActivityPub delivery mechanism.
let outgoing = {
let s = ctx.signer();
move |a: Activity| a.deliver(s)
};
// Apply the appropriate functions to "push" the activity.
if let Err (err) = data.pick(incoming, outgoing).await {
// Neither of these failing should be considered
// fatal, but if it happens too much, it could be
// an indication of something being borked.
println!("Push | Failure '{:?}'", err);
continue
};
},
}
}
})
}
}
}
pub mod flow {
//! Functional control flow based on the source and destination
//! of a message flowing through the system.
use std::future::Future;
/// A wrapper type that annotates a message with the flow it is
/// supposed to take, without allowing that flow to be inspected
/// or modified.
#[derive(Clone)]
pub struct Flow <T> {
flow: Direction,
data: T,
}
#[derive(Clone, Copy)]
enum Direction {
Incoming,
Outgoing,
}
impl<T> Flow<T> {
#[allow(non_snake_case)]
/// Make the data take the "incoming" flow.
pub fn Incoming (data: T) -> Flow<T> {
Flow { data, flow: Direction::Incoming }
}
#[allow(non_snake_case)]
/// Make the data take the "outbound" flow.
pub fn Outgoing (data: T) -> Flow<T> {
Flow { data, flow: Direction::Outgoing }
}
/// Apply a function `f` to the value inside, without disturbing
/// the flow direction.
pub async fn apply <F, A, U, E> (self, f: F) -> Result<Flow<U>, E>
where
A: Future<Output = Result<U, E>>,
F: FnOnce (T) -> A,
{
let Flow { data, flow } = self;
Ok (Flow {
data: f(data).await?,
flow,
})
}
/// If the message is taking the incoming flow, apply `f`, if it is taking the
/// outgoing flow, apply `g`.
pub async fn pick <F, G, A, B, U, E> (self, f: F, g: G) -> Result<Flow<U>, E>
where
A: Future<Output = Result<U, E>>,
B: Future<Output = Result<U, E>>,
F: FnOnce (T) -> A,
G: FnOnce (T) -> B,
{
match self.flow {
Direction::Incoming => self.apply(f).await,
Direction::Outgoing => self.apply(g).await,
}
}
/// Map over the contained value.
pub fn map <F, U> (self, f: F) -> Flow<U>
where
F: FnOnce (T) -> U,
{
Flow {
data: f(self.data),
flow: self.flow,
}
}
}
impl<T> Flow<Option<T>> {
/// Swap the containers.
pub fn to_option (self) -> Option<Flow<T>> {
let Flow { flow, data } = self;
data.map(|data| Flow {
flow,
data,
})
}
}
impl<T, E> Flow<Result<T, E>> {
/// Swap the containers.
pub fn to_result (self) -> Result<Flow<T>, E> {
let Flow { flow, data } = self;
data.map(|data| Flow {
flow,
data,
})
}
}
}
/// Control messages.
pub mod ctrl {
use std::sync::Arc;
use crate::conf::Config;
#[derive(Clone)]
pub enum Message {
/// Modify the existing configuration of each task.
Reconfigure (Arc<Box<dyn Fn (&mut Config) + Send + Sync>>),
/// Shut down everything.
Terminate,
}
}
/// Configuration.
pub mod conf {
use std::sync::Arc;
use crate::rule::Rule;
#[derive(Clone)]
pub struct Config {
/// The domain of the instance.
pub host: String,
/// The port to host the instance on. Defaults to `6969`.
pub port: u16,
/// Filtering rules applied to each activity.
pub rules: Vec<Arc<Box<dyn Rule + Send + Sync>>>,
/// Notification configuration.
pub notify: Notify,
}
impl Config {
/// Create a new default config.
pub fn new (hostname: impl ToString) -> Config {
let (notify, rules) = def();
Config {
host: hostname.to_string(),
port: 6969,
notify,
rules,
}
}
}
#[derive(Clone, Copy)]
pub struct Notify {
pub post_liked: bool,
pub post_shared: bool,
pub follow_requested: bool,
pub new_follower: bool,
}
impl Default for Notify {
fn default () -> Self {
Notify {
post_liked: true,
post_shared: true,
follow_requested: true,
new_follower: true,
}
}
}
/// Shortcut for creating a default instance
fn def <T> () -> T where T: Default { T::default() }
}
#[derive(Clone)]
pub struct Context <S> {
config: Config,
signer: S,
client: db::Client,
}
impl<S> Context<S> {
/// Attempt an action within the context of the database.
pub async fn with_db <'a, F, O, T> (&'a mut self, f: F) -> Result<T>
where
F: FnOnce (&'a mut db::Client) -> O,
O: Future<Output = Result<T>> + 'a,
{
f(&mut self.client).await
}
/// Get all actors on the instance.
pub fn actors (&self) -> impl Iterator<Item = Actor> + '_ {
None.into_iter()
}
/// Get a dereferencer.
pub fn dereferencer (&self) -> Dereferencer<S>
where
S: Sign + Clone
{
Dereferencer {
web: reqwest::Client::new(),
signer: self.signer.clone(),
db: self.client.clone(),
}
}
/// Access the inner [`Sign`] provider.
pub fn signer (&self) -> &S {
&self.signer
}
/// Access a notifier that delivers notifications to their intended targets.
pub fn notifier (&self) -> Notifier {
todo!()
}
/// Conjure an activity "from thin air" as though it were posted through a client.
pub (crate) async fn conjure (&self, act: impl Into<Activity>) -> Result<()> {
let act = act.into();
todo!()
}
}
pub trait IntoUrl {
fn into_url (self) -> Option<url::Url>;
}
impl<T> IntoUrl for T where T: ToString {
fn into_url (self) -> Option<url::Url> {
self.to_string()
.parse()
.ok()
}
}
pub struct Notifier {
config: conf::Notify,
socket: Box<dyn Sink<Activity, Error = Error> + Send + Sync + Unpin>,
}
/// A type that provides dereferencing facilities for [`Activity`] data.
pub struct Dereferencer <S> {
web: reqwest::Client,
db: db::Client,
signer: S,
}
impl<S> Dereferencer<S>
where
S: Sign
{
/// Perform the dereferencing.
pub async fn dereference (&self, json: Value) -> Result<Activity> {
match json["type"].as_str() {
Some ("Create") => self.deref_create(json).await.map(Activity::from),
_ => todo!()
}
}
fn db_client (&self) -> &db::Client {
&self.db
}
fn web_client (&self) -> &reqwest::Client {
&self.web
}
/// Fetch a JSON value.
pub async fn fetch (&self, url: impl IntoUrl) -> Result<Value> {
let client = self.web_client();
let url = match url.into_url() {
Some (url) => url,
None => todo!(),
};
let req = {
let mut r = client.get(url).build()?;
self.signer.sign(&mut r)?;
r
};
let value = client
.execute(req)
.await?
.json()
.await?;
Ok (value)
}
/// Attempt to dereference to a [`Create`](ap::Create) activity.
async fn deref_create (&self, json: Value) -> Result<ap::Create> {
let json = if let Value::String (url) = json {
self.fetch(url).await?
} else { json };
match json["object"]["type"].as_str() {
Some ("Note" | "Article") => todo!(), //Ok (act::Create::Note { id }),
_ => return Err (todo!()),
}
}
}
#[derive(Debug)]
pub enum Error {
Http (reqwest::Error),
Json (serde_json::Error),
Sqlx (sqlx::Error),
}
impl From<sqlx::Error> for Error {
fn from (e: sqlx::Error) -> Self { Error::Sqlx (e) }
}
impl From<reqwest::Error> for Error {
fn from (e: reqwest::Error) -> Self { Error::Http (e) }
}
impl From<serde_json::Error> for Error {
fn from (e: serde_json::Error) -> Self { Error::Json (e) }
}
fn err (e: impl Into<Error>) -> Error { e.into() }
pub type Result <T, E = Error> = std::result::Result<T, E>;
#[derive(Clone)]
pub struct Actor {
id: Id,
is_locked: bool,
}
#[derive(Clone)]
pub enum Activity {
Create (ap::Create),
Follow (ap::Follow),
Accept (ap::Accept),
}
impl Activity {
pub async fn perform <S> (self, mut ctx: Context<S>) -> Result<()>
where
S: sign::Sign
{
use ap::*;
match self {
Activity::Follow (Follow::Actor { id, actor, object, .. }) => {
// Find the actor this activity refers to. If it's not a local
// actor, we don't care.
let x = ctx.actors().find(|a| object.id == a.id);
match x {
// Unlocked account
Some (a) if !a.is_locked => {
// Prepare the operation.
let op = db::ops::Following {
from: actor.id.clone(),
to: object.id.clone(),
id: id.clone(),
};
// Use the database connection to perform an action.
ctx.with_db(|db| db.insert(op)).await?;
// Reply with an `Accept` activity if the account is not
// locked, so the remote knows it's ok to follow this actor
// immediately.
ctx.conjure(Accept::Follow {
object: Follow::Actor {
id: id.clone(),
object,
actor,
},
actor: a,
id,
}).await
},
_ => todo!(),
}
},
_ => todo!(),
}
}
/// Send a notification to the given [`Sink`].
pub async fn notify (self, notifier: Notifier) -> Result<()> {
let Notifier { config, mut socket } = notifier;
match &self {
// Only notify if the config value is set to `true`.
Activity::Follow (..) if config.new_follower =>
socket.send(self)
.map_err(err)
.await,
// In all other cases, do nothing
_ => Ok (())
}
}
/// Deliver the activity to all its targets through the ActivityPub
/// delivery mechanism.
pub async fn deliver <S> (self, signer: &S) -> Result<()>
where
S: sign::Sign + ?Sized,
{
// Create a shared client #efficiency
let client = reqwest::Client::new();
// the function that does the delivery to a target. It creates
// a request with the proper headers and signs it using the
// `signer`.
let do_delivery = |url| async {
let req = {
let mut r = client.get(url).build()?;
signer.sign(&mut r)?;
r
};
client
.execute(req)
.map_err(err)
.await
};
// Collect only the errors, since we don't need to do anything
// with a successful delivery.
let errors = self
.delivery_targets()
.await?
.into_iter()
.map(do_delivery)
.collect::<stream::FuturesUnordered<_>>()
.filter_map(|r: Result<_>| async {
r.err().map(err)
})
.collect::<Vec<Error>>()
.await;
for err in errors {
// Failure to deliver is not a fatal error per se,
// so we log and move on.
println!("Failed to deliver activity: {:?}", err);
}
Ok (())
}
// Get all delivery targets as urls.
async fn delivery_targets (&self) -> Result<Vec<reqwest::Url>> {
todo!()
}
}
pub mod db {
use crate::{Id, Result};
use futures::prelude::*;
use sqlx::{Executor, pool::PoolConnection};
/// `const ()` but in Rust
fn void <T> (_: T) -> () { () }
type Database = sqlx::Postgres;
pub struct Config {}
/// A database client.
#[derive(Clone)]
pub struct Client {
/// The internal connection pool.
pool: sqlx::Pool<Database>,
}
impl Client {
pub async fn new (_: Config) -> Result<Client> {
todo!()
}
/// Fetch the data mapped to the given `key` from the database.
pub async fn get <T> (&self, key: T::Key) -> Result<Option<T>>
where
T: Get,
{
self.with_conn(|c| T::get(key, c))
.await
}
/// Perfom an insertion on the database.
pub async fn insert <T> (&mut self, data: T) -> Result<()>
where
T: Insert,
{
self.with_conn(|c| data.set(c))
.await
.map(void)
}
/// Delete something from the database.
pub async fn delete <T> (&mut self, key: T::Key) -> Result<()>
where
T: Delete,
{
self.with_conn(|c| T::del(key, c))
.await
}
/// Handles the getting-a-connection logic.
async fn with_conn <F, O, T> (&self, f: F) -> Result<T>
where
F: FnOnce (&mut PoolConnection<Database>) -> O,
O: Future<Output = Result<T>>,
{
use crate::err;
self.pool
.acquire()
.map_err(err)
.and_then(|mut c| {
f(&mut c)
})
.await
}
}
pub trait Object: Sized {
type Key: Eq;
fn key (&self) -> &Self::Key;
}
pub trait Insert: Object {
type Future: Future<Output = Result<Self::Key>>;
fn set <'e, E> (self, exec: E) -> Self::Future
where
E: Executor<'e>;
}
pub trait Delete: Object {
type Future: Future<Output = Result<()>>;
fn del <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>;
}
pub trait Get: Object {
type Future: Future<Output = Result<Option<Self>>>;
fn get <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>;
}
pub mod ops {
//! Database operations (queries and updates).
use super::*;
pub struct Following {
pub from: Id,
pub to: Id,
pub id: Id,
}
impl Object for Following {
type Key = Id;
fn key (&self) -> &Self::Key { &self.id }
}
impl Insert for Following {
type Future = future::BoxFuture<'static, Result<Id>>;
fn set <'e, E> (self, exec: E) -> Self::Future
where
E: Executor<'e>
{
todo!()
}
}
}
}
mod id {
use serde::{ Deserialize, Serialize };
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct Id (reqwest::Url);
impl crate::IntoUrl for Id {
fn into_url (self) -> Option<url::Url> { Some (self.0) }
}
}
pub mod ap {
//! ActivityPub types and utilities.
use crate::{ Id, Activity, Actor };
#[derive(Clone)]
pub enum Create {
Note {
id: Id,
},
}
impl From<Create> for Activity {
fn from (a: Create) -> Self { Self::Create (a) }
}
#[derive(Clone)]
pub enum Follow {
Actor {
id: Id,
actor: Actor,
object: Actor,
},
}
impl From<Follow> for Activity {
fn from (a: Follow) -> Self { Self::Follow (a) }
}
#[derive(Clone)]
pub enum Accept {
Follow {
id: Id,
actor: Actor,
object: Follow,
}
}
impl From<Accept> for Activity {
fn from (a: Accept) -> Self { Self::Accept (a) }
}
}
pub mod sign {
//! Request signing.
use reqwest::Request;
use crate::Result;
pub trait Sign {
fn sign (&self, req: &mut Request) -> Result<()>;
}
}
pub mod rule {
//! User-defined activity transformation rules.
//!
//! Every [`Rule`] is a function `fn (Activity) -> Option<Activity>`.
use super::Activity;
/// Transforms an [`Activity`].
///
/// ```
/// use hermit::{ Activity, rule::{ Filter, Rule, keep } };
///
/// // Fails to compile if the given parameter is not a `Rule`
/// fn is_rule <R: Rule> (x: R) -> R { x }
///
/// // Closures of `Activity -> Activity` or
/// // `Activity -> Option<Activity>` can be used.
/// let closure = is_rule(|a: Activity| Some(a));
///
/// // `hermit::rule::Filter` implements `Rule`. This one will
/// // filter every activity.
/// let filter = is_rule(Filter (|_| true))
///
/// // `hermit::rule::keep` is a function pointer, and they
/// // always implement the `Fn*` traits.
/// let function = is_rule(keep);
///
/// // Rules can be combined using the `then` operator, in which
/// // case they will be applied in sequence.
/// let combined = is_rule(closure.then(filter).then(keep));
///
/// // Check if it works! Due to `filter`, any input this combined
/// // rule is applied to will be dropped.
/// let result = combined.apply(todo!());
/// assert!(result.is_none())
/// ```
pub trait Rule {
/// Apply the rule to the [`Activity`].
///
/// If this function returns `None`, the activity is dropped and will
/// not be processed further. This allows rules to function both as
/// transformations and as filters.
fn apply (&self, act: Activity) -> Option<Activity>;
/// Sequence `next` after `self` in a lazy way.
fn then <R> (self, next: R) -> Then<Self, R>
where
Self: Sized,
R: Rule,
{
Then (self, next)
}
/// Apply `self` only if `pred` holds.
fn only_if <P> (self, pred: P) -> Cond<P, Self>
where
Self: Sized,
P: Fn (&Activity) -> bool,
{
Cond { rule: self, pred }
}
}
impl<F, O> Rule for F
where
O: Into<Option<Activity>>,
F: Fn (Activity) -> O + Clone,
{
fn apply (&self, act: Activity) -> Option<Activity> {
self(act).into()
}
}
// Primitives
/// Always keep passed activities.
pub fn keep (a: Activity) -> Option<Activity> { Some (a) }
/// Always drop passed activities.
pub fn drop (_: Activity) -> Option<Activity> { None }
/// A simple filtering rule that drops the activity if it matches the predicate `P`.
#[derive(Clone)]
pub struct Filter <P> (pub P)
where
P: Fn (&Activity) -> bool;
impl<P> Rule for Filter<P>
where
P: Fn (&Activity) -> bool + Clone,
{
fn apply (&self, act: Activity) -> Option<Activity> {
let Self (f) = self;
if f(&act) {
None
} else {
Some (act)
}
}
}
// Combinators
/// Sequence two rules.
///
/// `B` will only be applied if `A` returns [`Some`], otherwise it
/// short-circuits.
#[derive(Clone)]
pub struct Then <A, B> (A, B);
impl<A, B> Rule for Then<A, B>
where
A: Rule,
B: Rule,
{
fn apply (&self, act: Activity) -> Option<Activity> {
let Self (a, b) = self;
a.apply(act).and_then(|act| {
b.apply(act)
})
}
}
/// Apply a rule conditionally.
///
/// If the predicate `P` returns `true`, apply `R`. Otherwise, return the
/// activity unmodified.
#[derive(Clone)]
pub struct Cond <P, R> {
pred: P,
rule: R,
}
impl<P, R> Rule for Cond<P, R>
where
P: Fn (&Activity) -> bool + Clone,
R: Rule,
{
fn apply (&self, act: Activity) -> Option<Activity> {
let Self { pred, rule } = self;
if pred(&act) {
rule.apply(act)
} else {
Some (act)
}
}
}
/// Execute a command and drop if nonzero exit code or empty stdout.
/// If the exit code is zero, stdout will be deserialized to an
/// [`Activity`].
#[derive(Clone)]
pub struct Exec (std::path::PathBuf);
impl Exec {
pub fn new (path: impl AsRef<std::path::Path>) -> Option<Exec> {
todo!()
}
}
}