diff --git a/flake.lock b/flake.lock index ccd1bd2..3915d7c 100644 --- a/flake.lock +++ b/flake.lock @@ -15,6 +15,21 @@ "type": "github" } }, + "flake-utils_2": { + "locked": { + "lastModified": 1637014545, + "narHash": "sha256-26IZAc5yzlD9FlDT54io1oqG/bBoyka+FJk5guaX4x4=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "bba5dcc8e0b20ab664967ad83d24d64cb64ec4f4", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, "naersk": { "inputs": { "nixpkgs": "nixpkgs" @@ -61,11 +76,47 @@ "type": "indirect" } }, + "nixpkgs_3": { + "locked": { + "lastModified": 1637453606, + "narHash": "sha256-Gy6cwUswft9xqsjWxFYEnx/63/qzaFUwatcbV5GF/GQ=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "8afc4e543663ca0a6a4f496262cd05233737e732", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "oxalica": { + "inputs": { + "flake-utils": "flake-utils_2", + "nixpkgs": "nixpkgs_3" + }, + "locked": { + "lastModified": 1655606998, + "narHash": "sha256-6XIQEwmoldCE3lzI54VQxD2tFJoeRsjRMYWkthtGRRw=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "4b600525be94c23b44c42441f33460343fafe7a1", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + }, "root": { "inputs": { "flake-utils": "flake-utils", "naersk": "naersk", - "nixpkgs": "nixpkgs_2" + "nixpkgs": "nixpkgs_2", + "oxalica": "oxalica" } } }, diff --git a/flake.nix b/flake.nix index 5a357fe..5ad7184 100644 --- a/flake.nix +++ b/flake.nix @@ -1,33 +1,38 @@ { - inputs = { - flake-utils.url = "github:numtide/flake-utils"; - naersk.url = "github:nix-community/naersk"; - }; + inputs = { + flake-utils.url = "github:numtide/flake-utils"; + oxalica.url = "github:oxalica/rust-overlay"; + naersk.url = "github:nix-community/naersk"; + }; - outputs = { self, nixpkgs, flake-utils, naersk }: - flake-utils.lib.eachDefaultSystem ( - system: let - pkgs = nixpkgs.legacyPackages."${system}"; - naersk-lib = naersk.lib."${system}"; - in - rec { - # `nix build` - packages."hermit" = naersk-lib.buildPackage { - pname = "hermit"; - root = ./.; - }; - defaultPackage = packages."hermit"; + outputs = { self, nixpkgs, flake-utils, naersk, oxalica }: + flake-utils.lib.eachDefaultSystem (system: + let pkgs = import nixpkgs { inherit system; overlays = [ oxalica.overlay ]; }; + naersk-lib = naersk.lib."${system}"; + in rec { + # `nix build` + packages."hermit" = naersk-lib.buildPackage { + pname = "hermit"; + root = ./.; + }; + + defaultPackage = packages."hermit"; - # `nix run` - apps."hermit"= flake-utils.lib.mkApp { - drv = packages."hermit"; - }; - defaultApp = apps."hermit"; + # `nix run` + apps."hermit"= flake-utils.lib.mkApp { + drv = packages."hermit"; + }; - # `nix develop` - devShell = pkgs.mkShell { - nativeBuildInputs = with pkgs; [ rustc cargo openssl pkgconfig ]; - }; - } - ); + defaultApp = apps."hermit"; + + # `nix develop` + devShell = pkgs.mkShell { + nativeBuildInputs = with pkgs; [ + (rust-bin.selectLatestNightlyWith (t: t.default)) + openssl + pkgconfig + ]; + }; + } + ); } diff --git a/src/ap/mod.rs b/src/ap/mod.rs index 662200a..a3a131d 100644 --- a/src/ap/mod.rs +++ b/src/ap/mod.rs @@ -1,24 +1,25 @@ //! ActivityPub implementation code and related abstractions. use futures::prelude::*; +use serde::Serialize; use crate::{ Id, Activity, err, Result, Error, sign, ctx::Context }; -#[derive(Clone)] +#[derive(Clone, Serialize)] pub enum Create { Note { object: Note } } impl From for Activity { fn from (a: Create) -> Activity { Activity::Create (a) } } -#[derive(Clone)] +#[derive(Clone, Serialize)] pub enum Follow { Actor { object: Actor } } impl From for Activity { fn from (a: Follow) -> Activity { Activity::Follow (a) } } -#[derive(Clone)] +#[derive(Clone, Serialize)] pub enum Accept { Follow { object: Follow } } @@ -26,12 +27,12 @@ pub enum Accept { impl From for Activity { fn from (a: Accept) -> Activity { Activity::Accept (a) } } /// An entity that publishes activities. -#[derive(Clone)] +#[derive(Clone, Serialize, sqlx::FromRow)] pub struct Actor { id: Id, } -#[derive(Clone)] +#[derive(Clone, Serialize)] pub struct Note { id: Id, } diff --git a/src/db/mod.rs b/src/db/mod.rs index cd19635..1295cb5 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,13 +1,17 @@ //! Database abstraction layer used by Hermit. -use crate::{ Id, Result }; +use std::borrow::Borrow; +use std::{marker::PhantomData, pin::Pin}; + +use crate::{ Id, Result, ap::Actor, err }; use futures::prelude::*; -use sqlx::{Executor, pool::PoolConnection}; +use sqlx::{Executor, pool::PoolConnection, database::HasArguments}; +use sqlx::{ FromRow, Either::Right }; /// `const ()` but in Rust fn void (_: T) -> () { () } -type Database = sqlx::Postgres; +pub(crate) type Database = sqlx::Postgres; /// Specifies how to connect to the database. pub struct Config {} @@ -28,32 +32,27 @@ impl Client { todo!() } - /// Fetch the data mapped to the given `key` from the database. - pub async fn get (&self, key: T::Key) -> Result> - where - T: Get, - { - self.with_conn(|c| T::get(key, c)) - .await + pub async fn query <'e, T: 'e> (&self, query: Query<'e, T>) -> Pin> + Send + 'e>> { + let Query (q, f) = query; + let stream = q + .fetch_many(&self.pool) + .filter_map(async move |r| { + match r.map_err(err) { + Ok (Right (row)) => Some (f(row)), + Err (error) => Some (Err (error)), + _ => None, + } + }); + + Box::pin (stream) } - /// Perfom an insertion on the database. - pub async fn insert (&mut self, data: T) -> Result<()> - where - T: Insert, - { - self.with_conn(|c| data.set(c)) - .await - .map(void) - } - - /// Delete something from the database. - pub async fn delete (&mut self, key: T::Key) -> Result<()> - where - T: Delete, - { - self.with_conn(|c| T::del(key, c)) - .await + pub async fn get <'q, T: 'q> (&self, query: Query<'q, T>) -> Result> { + self.query(query) + .await + .next() + .await + .transpose() } /// Handles the getting-a-connection logic. @@ -62,8 +61,6 @@ impl Client { F: FnOnce (&mut PoolConnection) -> O, O: Future>, { - use crate::err; - self.pool .acquire() .map_err(err) @@ -75,52 +72,24 @@ impl Client { } -pub trait Object: Sized { - type Key: Eq; - fn key (&self) -> &Self::Key; -} - -pub trait Insert: Object { - type Future: Future>; - fn set <'e, E> (self, exec: E) -> Self::Future - where - E: Executor<'e>; -} - -pub trait Delete: Object { - type Future: Future>; - fn del <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>; -} - -pub trait Get: Object { - type Future: Future>>; - fn get <'e, E> (key: Self::Key, exec: E) -> Self::Future where E: Executor<'e>; -} - -pub mod ops { - - //! Database operations (queries and updates). - - use super::*; - - pub struct Following { - pub from: Id, - pub to: Id, - pub id: Id, - } - - impl Object for Following { - type Key = Id; - fn key (&self) -> &Self::Key { &self.id } - } - - impl Insert for Following { - type Future = future::BoxFuture<'static, Result>; - fn set <'e, E> (self, exec: E) -> Self::Future - where - E: Executor<'e> - { - todo!() - } - } +type Q<'a> = sqlx::query::Query<'a, Database, >::Arguments>; +type Row = ::Row; + +pub struct Query <'a, T> (Q<'a>, fn (Row) -> Result); + +/// Generate a query that gets an [`Actor`] by its [`Id`]. +pub fn get_actor <'a> (id: &'a Id) -> Query<'a, Actor> { + + // Prepare a query + let query = sqlx::query("select * from actors where id = $1") + .bind(id); + + // Return an sql query which will result in a series of rows, + // and a decoder function that will translate each row to a + // value of type `Actor`. + Query (query, |row: Row| { + let data = Actor::from_row(&row)?; + Ok (data) + }) + } diff --git a/src/lib.rs b/src/lib.rs index e9c3cfa..92f15b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,11 @@ +#![feature(async_closure)] //! # The Hermit ActivityPub server //! //! This library contains the types and trait impls that make up the ActivityPub //! support and database interaction for the Hermit ActivityPub server. +#![feature(generic_associated_types)] + +use serde::Serialize; // Expose the `Id` type in the crate root pub use id::Id; @@ -14,7 +18,7 @@ pub mod db; pub mod ap; /// The Activity supertype used in abstractions over any kind of activity. -#[derive(Clone)] +#[derive(Clone, Serialize)] pub enum Activity { /// Create a post. Create (ap::Create), @@ -62,18 +66,19 @@ pub (crate) fn err (e: impl Into) -> Error { e.into() } mod id { - use std::str::FromStr; + use std::{str::FromStr, error::Error}; use serde::{ Deserialize, Serialize }; + use sqlx::database::{HasArguments, HasValueRef}; + use url::ParseError; + + use crate::db; /// An ActivityPub identifier. #[derive(PartialEq, Eq, Clone, Serialize, Deserialize)] + #[serde(transparent)] pub struct Id (reqwest::Url); - impl crate::IntoUrl for Id { - fn into_url (self) -> Option { Some (self.0) } - } - impl FromStr for Id { type Err = url::ParseError; @@ -82,6 +87,31 @@ mod id { } } + impl std::fmt::Display for Id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.to_string()) + } + } + + impl sqlx::Type for Id { + fn type_info () -> ::TypeInfo { + String::type_info() + } + } + + impl<'q> sqlx::Encode<'q, db::Database> for Id { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> sqlx::encode::IsNull { + self.0.to_string().encode_by_ref(buf) + } + } + + impl<'r> sqlx::Decode<'r, db::Database> for Id { + fn decode(value: >::ValueRef) -> Result { + >::decode(value) + .map(|s| s.parse().expect("Failed to parse ID as URL")) + .map(Id) + } + } } mod ctx { @@ -89,9 +119,9 @@ mod ctx { use std::sync::Arc; use futures::prelude::*; - use serde_json::Value; + use serde_json::{Value, to_value}; - use crate::{ conf::Config, db, Result, sign::Sign, ap, Activity }; + use crate::{ conf::Config, db, Result, sign::Sign, ap::{self, Actor}, Activity, Id, err }; /// The context of a thread/task. /// @@ -224,6 +254,20 @@ mod ctx { Ok (value) + } + + /// Fetch a value from the database by trying all the ActivityPub + /// records. + async fn db_fetch (&self, id: Id) -> Result> { + + if let Some (data) = self.db.get(db::get_actor(&id)).await? { + return to_value(data) + .map_err(err) + .map(Some) + } + + todo!() + } /// Attempt to dereference to a [`Create`](ap::Create) activity. diff --git a/src/main.rs b/src/main.rs index 24f0de7..e31197c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ + use std::sync::Arc; -use futures::stream; use hermit::{ Context, Error, db, sign, Activity, }; use hermit::conf::Config; use tokio::sync::{mpsc, broadcast}; use task::Executor; -use tokio_stream::wrappers::{ReceiverStream, BroadcastStream}; +use tokio_stream::wrappers::ReceiverStream; /// Module that contains all the API endpoints and frontend pages /// used by Hermit. @@ -68,6 +68,8 @@ async fn main () { ctrl_tx, }); + // Redefine `ctrl_tx`: it is now the transmitter that transmits + // *from* `Ctrl`. let (ctrl_tx, _rx) = broadcast::channel(256); ctx.run (task::Auto { @@ -93,32 +95,28 @@ fn mk_channel (size: usize) -> (mpsc::Sender, ReceiverStream) { (tx, rx) } -fn err (e: impl Into) -> Error { e.into() } - mod task { //! Async tasks, communicating with each other across threads through generic //! streams and sinks. use std::pin::Pin; - use tokio::sync::{mpsc, broadcast}; + use tokio::sync::{ + broadcast, + mpsc, + }; use futures::prelude::*; use serde_json::Value; - use crate::web; - use crate::sign::Sign; - use crate::{flow::Flow, Activity, ctrl::Message, Context}; + use crate::{ + flow::Flow, + ctrl::Message, + sign::Sign, + Activity, + Context, + web, + }; - impl Executor for Context - where - S: Sign + Send + Sync + 'static - { - fn run (&self, task: impl Task) { - let ctx: Context = self.clone(); - tokio::spawn(task.run(ctx)); - } - } - - /// Something that can execute a task. + /// Something that can execute a [`Task`]. pub trait Executor { /// Perform a [`Task`]. @@ -137,6 +135,16 @@ mod task { where S: Sign + Send + Sync + 'static; + } + + impl Executor for Context + where + S: Sign + Send + Sync + 'static + { + fn run (&self, task: impl Task) { + let ctx: Context = self.clone(); + tokio::spawn(task.run(ctx)); + } } /// The main web server.