From 4a12b39580443376d0f69288efd66bc86536c714 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Tue, 7 Jun 2022 16:51:08 +0200 Subject: [PATCH] Add `Notifier` --- Cargo.lock | 334 +++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 1 + src/main.rs | 76 +++++++----- 3 files changed, 377 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ffd288..74aef71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,12 +2,71 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "async-trait" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2504b827a8bef941ba3dd64bdffe9cf56ca182908a147edd6189c95fbcae7d" +dependencies = [ + "async-trait", + "axum-core", + "base64", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sha-1", + "sync_wrapper", + "tokio", + "tokio-tungstenite", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da31c0ed7b4690e2c78fe4b880d21cd7db04a346ebc658b4270251b695437f17" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "base64" version = "0.13.0" @@ -20,12 +79,27 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.1.0" @@ -60,6 +134,35 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "cpufeatures" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "encoding_rs" version = "0.8.31" @@ -198,6 +301,27 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.10.2+wasi-snapshot-preview1", +] + [[package]] name = "h2" version = "0.3.13" @@ -227,6 +351,7 @@ checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" name = "hermit" version = "0.1.0" dependencies = [ + "axum", "futures", "reqwest", "serde", @@ -265,6 +390,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.7.1" @@ -402,6 +533,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "memchr" version = "2.5.0" @@ -422,7 +559,7 @@ checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -534,6 +671,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -552,6 +709,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro2" version = "1.0.39" @@ -570,6 +733,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -712,6 +905,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -745,15 +949,21 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" +checksum = "0748dd251e24453cb8717f0354206b91557e4ec8703673a4b30208f2abaf1ebf" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "tempfile" version = "3.3.0" @@ -768,6 +978,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "thiserror" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -824,6 +1054,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.3" @@ -838,6 +1080,48 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.1" @@ -851,6 +1135,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-core", ] @@ -870,6 +1155,31 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha-1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" + [[package]] name = "unicode-bidi" version = "0.3.8" @@ -903,12 +1213,24 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vcpkg" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + [[package]] name = "want" version = "0.3.0" @@ -919,6 +1241,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 6b553a5..3bf425a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ futures = '*' reqwest = '*' serde = { version = '*', features = [ "derive" ] } serde_json = '*' +axum = { version = '*', features = [ "ws", "serde_json" ] } diff --git a/src/main.rs b/src/main.rs index ab59499..4a2d4ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::prelude::*; use reqwest::{IntoUrl, Response}; @@ -74,31 +76,26 @@ mod task { pub ctrl_rx: C, } - pub struct Process { + pub struct Process { pub data_rx: D, pub ctrl_rx: C, - pub fe_tx: F, } - impl Task for Process + impl Task for Process where D: Stream> + Unpin + Send + 'static, C: Stream + Unpin + Send + 'static, - Arc: Sink + Send + Sync + 'static, - as Sink>::Error: Into, - F: Send + Sync + 'static, { type Future = Pin + Send + 'static>>; fn run (self) -> Self::Future { - let Self { mut data_rx, mut ctrl_rx, fe_tx } = self; + let Self { mut data_rx, mut ctrl_rx } = self; Box::pin(async move { let mut config = crate::conf::Config::new("localhost"); let ctx = crate::Context {}; - let sink = Arc::new(fe_tx); loop { tokio::select! { @@ -113,10 +110,15 @@ mod task { Some (data) = data_rx.next() => { // Dereferencing and other unfucking. - let d = ctx.dereferencer(); + let d = ctx.dereferencer(); let data = match data.apply(|j| d.dereference(j)).await { Ok (data) => data, - _ => continue, + Err (err) => { + // If dereferencing fails, that sucks but it's not + // fatal, so we drop the activity entirely. + println!("Fixup | Dropped due to '{:?}'", err); + continue + }, }; // Run both incoming and outgoing activities through the filtering system. @@ -129,31 +131,34 @@ mod task { }; // Perform each activity in the context of the instance. - let c = ctx.clone(); - let copy = data.clone(); - let data = match data.apply(|a| a.perform(c)).await { - // Everything went ok, we can continue, bind `copy` to `data`. - Ok (_) => copy, + let c = ctx.clone(); + if let Err (err) = data.clone().apply(|a| a.perform(c)).await { // Something went wrong while performing the activity, // report error and move on. - _ => continue, + println!("Exec | Failure '{:?}'", err); + continue }; + // Push each activity to an appropriate location. // If incoming: push a notification to the frontend. let incoming = { - let s = sink.clone(); - let c = config.notify.clone(); - move |a: Activity| a.notify(c, s) + let n = ctx.notifier(); + move |a: Activity| a.notify(n) }; - // If outgoing: deliver the activity to its targets. + // If outgoing: deliver the activity to its targets using + // the ActivityPub delivery mechanism. let outgoing = { let s = ctx.signer(); move |a: Activity| a.deliver(s) }; - match data.pick(incoming, outgoing).await { - Ok (_) => println!("Yay"), - _ => println!("Boo"), + // Apply the appropriate functions to "push" the activity. + if let Err (err) = data.pick(incoming, outgoing).await { + // Neither of these failing should be considered + // fatal, but if it happens too much, it could be + // an indication of something being borked. + println!("Push | Failure '{:?}'", err); + continue }; }, @@ -341,6 +346,7 @@ pub mod conf { pub struct Context {} impl Context { + pub fn dereferencer (&self) -> Dereferencer { Dereferencer { web: reqwest::Client::new() } } @@ -348,14 +354,25 @@ impl Context { pub fn signer (&self) -> &(dyn sign::Sign + Send + Sync) { todo!() } + + pub fn notifier (&self) -> Notifier { + todo!() + } + +} + +pub struct Notifier { + config: conf::Notify, + socket: Box + Send + Sync + Unpin>, } pub struct Dereferencer { - web: reqwest::Client + web: reqwest::Client, } impl Dereferencer { - pub async fn dereference (&self, a: serde_json::Value) -> Result { + /// Perform the dereferencing. + pub async fn dereference (&self, json: serde_json::Value) -> Result { todo!() } } @@ -385,13 +402,10 @@ impl Activity { } /// Send a notification to the given [`Sink`]. - pub async fn notify (self, cfg: conf::Notify, mut sink: S) -> Result<()> - where - S: Sink + Unpin, - S::Error: Into, - { + pub async fn notify (self, notifier: Notifier) -> Result<()> { + let Notifier { config, mut socket } = notifier; match &self { - Activity::Follow (..) if cfg.new_follower => sink.send(self).await.map_err(err), + Activity::Follow (..) if config.new_follower => socket.send(self).await.map_err(err), // Otherwise, do nothing _ => Ok (()) }