From 2ce32ba904e53a9160f1ca7e189cf321ed823827 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Tue, 7 Jun 2022 15:40:18 +0200 Subject: [PATCH] Add delivery --- src/main.rs | 69 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index a839885..ab59499 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use futures::prelude::*; +use reqwest::{IntoUrl, Response}; #[tokio::main] async fn main() { @@ -84,6 +85,7 @@ mod task { D: Stream> + Unpin + Send + 'static, C: Stream + Unpin + Send + 'static, Arc: Sink + Send + Sync + 'static, + as Sink>::Error: Into, F: Send + Sync + 'static, { type Future = Pin + Send + 'static>>; @@ -359,7 +361,15 @@ impl Dereferencer { } #[derive(Debug)] -pub enum Error {} +pub enum Error { + Http (reqwest::Error), +} + +impl From for Error { + fn from (e: reqwest::Error) -> Self { Error::Http (e) } +} + +fn err (e: impl Into) -> Error { e.into() } pub type Result = std::result::Result; @@ -374,17 +384,68 @@ impl Activity { todo!() } - pub async fn notify (self, cfg: conf::Notify, sink: S) -> Result<()> + /// Send a notification to the given [`Sink`]. + pub async fn notify (self, cfg: conf::Notify, mut sink: S) -> Result<()> where - S: Sink, + S: Sink + Unpin, + S::Error: Into, { - todo!() + match &self { + Activity::Follow (..) if cfg.new_follower => sink.send(self).await.map_err(err), + // Otherwise, do nothing + _ => Ok (()) + } } + /// Deliver the activity to all its targets through the ActivityPub + /// delivery mechanism. pub async fn deliver (self, signer: &S) -> Result<()> where S: sign::Sign + ?Sized, { + // Create a shared client #efficiency + let client = reqwest::Client::new(); + + // the function that does the delivery to a target. It creates + // a request with the proper headers and signs it using the + // `signer`. + let do_delivery = |url| async { + let req = { + let mut r = client.get(url).build()?; + signer.sign(&mut r)?; + r + }; + client + .execute(req) + .map_err(err) + .await + }; + + // Collect only the errors, since we don't need to do anything + // with a successful delivery. + let errors = self + .delivery_targets() + .await? + .into_iter() + .map(do_delivery) + .collect::>() + .filter_map(|r: Result<_>| async { + r.err().map(err) + }) + .collect::>() + .await; + + for err in errors { + // Failure to deliver is not a fatal error per se, + // so we log and move on. + println!("Failed to deliver activity: {:?}", err); + } + + Ok (()) + } + + // Get all delivery targets as urls. + async fn delivery_targets (&self) -> Result> { todo!() } }