Add `Notifier`

This commit is contained in:
Riley Apeldoorn 2022-06-07 16:51:08 +02:00
parent 2ce32ba904
commit 4a12b39580
3 changed files with 377 additions and 34 deletions

334
Cargo.lock generated
View File

@ -2,12 +2,71 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "async-trait"
version = "0.1.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d"
dependencies = [
"async-trait",
"axum-core",
"base64",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"sha-1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -20,12 +79,27 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324"
dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.1.0"
@ -60,6 +134,35 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cpufeatures"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"
dependencies = [
"libc",
]
[[package]]
name = "crypto-common"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "digest"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]]
name = "encoding_rs"
version = "0.8.31"
@ -198,6 +301,27 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "h2"
version = "0.3.13"
@ -227,6 +351,7 @@ checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
name = "hermit"
version = "0.1.0"
dependencies = [
"axum",
"futures",
"reqwest",
"serde",
@ -265,6 +390,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]]
name = "httparse"
version = "1.7.1"
@ -402,6 +533,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "matchit"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
[[package]]
name = "memchr"
version = "2.5.0"
@ -422,7 +559,7 @@ checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799"
dependencies = [
"libc",
"log",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys",
]
@ -534,6 +671,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@ -552,6 +709,12 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"
[[package]]
name = "ppv-lite86"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro2"
version = "1.0.39"
@ -570,6 +733,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.13"
@ -712,6 +905,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha-1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -745,15 +949,21 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.95"
version = "1.0.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942"
checksum = "0748dd251e24453cb8717f0354206b91557e4ec8703673a4b30208f2abaf1ebf"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "tempfile"
version = "3.3.0"
@ -768,6 +978,26 @@ dependencies = [
"winapi",
]
[[package]]
name = "thiserror"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -824,6 +1054,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.3"
@ -838,6 +1080,48 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-service"
version = "0.3.1"
@ -851,6 +1135,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-core",
]
@ -870,6 +1155,31 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicode-bidi"
version = "0.3.8"
@ -903,12 +1213,24 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "want"
version = "0.3.0"
@ -919,6 +1241,12 @@ dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View File

@ -11,3 +11,4 @@ futures = '*'
reqwest = '*'
serde = { version = '*', features = [ "derive" ] }
serde_json = '*'
axum = { version = '*', features = [ "ws", "serde_json" ] }

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use futures::prelude::*;
use reqwest::{IntoUrl, Response};
@ -74,31 +76,26 @@ mod task {
pub ctrl_rx: C,
}
pub struct Process <D, C, F> {
pub struct Process <D, C> {
pub data_rx: D,
pub ctrl_rx: C,
pub fe_tx: F,
}
impl<D, C, F> Task for Process<D, C, F>
impl<D, C> Task for Process<D, C>
where
D: Stream<Item = Flow<serde_json::Value>> + Unpin + Send + 'static,
C: Stream<Item = Message> + Unpin + Send + 'static,
Arc<F>: Sink<Activity> + Send + Sync + 'static,
<Arc<F> as Sink<Activity>>::Error: Into<crate::Error>,
F: Send + Sync + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn run (self) -> Self::Future {
let Self { mut data_rx, mut ctrl_rx, fe_tx } = self;
let Self { mut data_rx, mut ctrl_rx } = self;
Box::pin(async move {
let mut config = crate::conf::Config::new("localhost");
let ctx = crate::Context {};
let sink = Arc::new(fe_tx);
loop {
tokio::select! {
@ -113,10 +110,15 @@ mod task {
Some (data) = data_rx.next() => {
// Dereferencing and other unfucking.
let d = ctx.dereferencer();
let d = ctx.dereferencer();
let data = match data.apply(|j| d.dereference(j)).await {
Ok (data) => data,
_ => continue,
Err (err) => {
// If dereferencing fails, that sucks but it's not
// fatal, so we drop the activity entirely.
println!("Fixup | Dropped due to '{:?}'", err);
continue
},
};
// Run both incoming and outgoing activities through the filtering system.
@ -129,31 +131,34 @@ mod task {
};
// Perform each activity in the context of the instance.
let c = ctx.clone();
let copy = data.clone();
let data = match data.apply(|a| a.perform(c)).await {
// Everything went ok, we can continue, bind `copy` to `data`.
Ok (_) => copy,
let c = ctx.clone();
if let Err (err) = data.clone().apply(|a| a.perform(c)).await {
// Something went wrong while performing the activity,
// report error and move on.
_ => continue,
println!("Exec | Failure '{:?}'", err);
continue
};
// Push each activity to an appropriate location.
// If incoming: push a notification to the frontend.
let incoming = {
let s = sink.clone();
let c = config.notify.clone();
move |a: Activity| a.notify(c, s)
let n = ctx.notifier();
move |a: Activity| a.notify(n)
};
// If outgoing: deliver the activity to its targets.
// If outgoing: deliver the activity to its targets using
// the ActivityPub delivery mechanism.
let outgoing = {
let s = ctx.signer();
move |a: Activity| a.deliver(s)
};
match data.pick(incoming, outgoing).await {
Ok (_) => println!("Yay"),
_ => println!("Boo"),
// Apply the appropriate functions to "push" the activity.
if let Err (err) = data.pick(incoming, outgoing).await {
// Neither of these failing should be considered
// fatal, but if it happens too much, it could be
// an indication of something being borked.
println!("Push | Failure '{:?}'", err);
continue
};
},
@ -341,6 +346,7 @@ pub mod conf {
pub struct Context {}
impl Context {
pub fn dereferencer (&self) -> Dereferencer {
Dereferencer { web: reqwest::Client::new() }
}
@ -348,14 +354,25 @@ impl Context {
pub fn signer (&self) -> &(dyn sign::Sign + Send + Sync) {
todo!()
}
pub fn notifier (&self) -> Notifier {
todo!()
}
}
pub struct Notifier {
config: conf::Notify,
socket: Box<dyn Sink<Activity, Error = Error> + Send + Sync + Unpin>,
}
pub struct Dereferencer {
web: reqwest::Client
web: reqwest::Client,
}
impl Dereferencer {
pub async fn dereference (&self, a: serde_json::Value) -> Result<Activity> {
/// Perform the dereferencing.
pub async fn dereference (&self, json: serde_json::Value) -> Result<Activity> {
todo!()
}
}
@ -385,13 +402,10 @@ impl Activity {
}
/// Send a notification to the given [`Sink`].
pub async fn notify <S> (self, cfg: conf::Notify, mut sink: S) -> Result<()>
where
S: Sink<Activity> + Unpin,
S::Error: Into<Error>,
{
pub async fn notify (self, notifier: Notifier) -> Result<()> {
let Notifier { config, mut socket } = notifier;
match &self {
Activity::Follow (..) if cfg.new_follower => sink.send(self).await.map_err(err),
Activity::Follow (..) if config.new_follower => socket.send(self).await.map_err(err),
// Otherwise, do nothing
_ => Ok (())
}