Add delivery
This commit is contained in:
parent
868ce0a90c
commit
2ce32ba904
1 changed files with 65 additions and 4 deletions
69
src/main.rs
69
src/main.rs
|
@ -1,4 +1,5 @@
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use reqwest::{IntoUrl, Response};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
@ -84,6 +85,7 @@ mod task {
|
||||||
D: Stream<Item = Flow<serde_json::Value>> + Unpin + Send + 'static,
|
D: Stream<Item = Flow<serde_json::Value>> + Unpin + Send + 'static,
|
||||||
C: Stream<Item = Message> + Unpin + Send + 'static,
|
C: Stream<Item = Message> + Unpin + Send + 'static,
|
||||||
Arc<F>: Sink<Activity> + Send + Sync + 'static,
|
Arc<F>: Sink<Activity> + Send + Sync + 'static,
|
||||||
|
<Arc<F> as Sink<Activity>>::Error: Into<crate::Error>,
|
||||||
F: Send + Sync + 'static,
|
F: Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
|
type Future = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
|
||||||
|
@ -359,7 +361,15 @@ impl Dereferencer {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {}
|
pub enum Error {
|
||||||
|
Http (reqwest::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<reqwest::Error> for Error {
|
||||||
|
fn from (e: reqwest::Error) -> Self { Error::Http (e) }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn err (e: impl Into<Error>) -> Error { e.into() }
|
||||||
|
|
||||||
pub type Result <T, E = Error> = std::result::Result<T, E>;
|
pub type Result <T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
@ -374,17 +384,68 @@ impl Activity {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn notify <S> (self, cfg: conf::Notify, sink: S) -> Result<()>
|
/// Send a notification to the given [`Sink`].
|
||||||
|
pub async fn notify <S> (self, cfg: conf::Notify, mut sink: S) -> Result<()>
|
||||||
where
|
where
|
||||||
S: Sink<Activity>,
|
S: Sink<Activity> + Unpin,
|
||||||
|
S::Error: Into<Error>,
|
||||||
{
|
{
|
||||||
todo!()
|
match &self {
|
||||||
|
Activity::Follow (..) if cfg.new_follower => sink.send(self).await.map_err(err),
|
||||||
|
// Otherwise, do nothing
|
||||||
|
_ => Ok (())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deliver the activity to all its targets through the ActivityPub
|
||||||
|
/// delivery mechanism.
|
||||||
pub async fn deliver <S> (self, signer: &S) -> Result<()>
|
pub async fn deliver <S> (self, signer: &S) -> Result<()>
|
||||||
where
|
where
|
||||||
S: sign::Sign + ?Sized,
|
S: sign::Sign + ?Sized,
|
||||||
{
|
{
|
||||||
|
// Create a shared client #efficiency
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
// the function that does the delivery to a target. It creates
|
||||||
|
// a request with the proper headers and signs it using the
|
||||||
|
// `signer`.
|
||||||
|
let do_delivery = |url| async {
|
||||||
|
let req = {
|
||||||
|
let mut r = client.get(url).build()?;
|
||||||
|
signer.sign(&mut r)?;
|
||||||
|
r
|
||||||
|
};
|
||||||
|
client
|
||||||
|
.execute(req)
|
||||||
|
.map_err(err)
|
||||||
|
.await
|
||||||
|
};
|
||||||
|
|
||||||
|
// Collect only the errors, since we don't need to do anything
|
||||||
|
// with a successful delivery.
|
||||||
|
let errors = self
|
||||||
|
.delivery_targets()
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.map(do_delivery)
|
||||||
|
.collect::<stream::FuturesUnordered<_>>()
|
||||||
|
.filter_map(|r: Result<_>| async {
|
||||||
|
r.err().map(err)
|
||||||
|
})
|
||||||
|
.collect::<Vec<Error>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
for err in errors {
|
||||||
|
// Failure to deliver is not a fatal error per se,
|
||||||
|
// so we log and move on.
|
||||||
|
println!("Failed to deliver activity: {:?}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all delivery targets as urls.
|
||||||
|
async fn delivery_targets (&self) -> Result<Vec<reqwest::Url>> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue