Compare commits

...

2 Commits
mistress ... db

8 changed files with 499 additions and 93 deletions

36
Cargo.lock generated
View File

@ -138,6 +138,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "core-foundation"
version = "0.9.3"
@ -208,6 +214,19 @@ dependencies = [
"typenum",
]
[[package]]
name = "derive_more"
version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version",
"syn",
]
[[package]]
name = "digest"
version = "0.10.3"
@ -478,8 +497,10 @@ name = "hermit"
version = "0.1.0"
dependencies = [
"axum",
"derive_more",
"futures",
"openssl",
"rand",
"reqwest",
"serde",
"serde_json",
@ -1056,6 +1077,15 @@ dependencies = [
"winreg",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.10"
@ -1101,6 +1131,12 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2333e6df6d6598f2b1974829f853c2b4c5f4a6e503c10af918081aa6f8564e1"
[[package]]
name = "serde"
version = "1.0.137"

View File

@ -16,3 +16,5 @@ url = { version = '*', features = [ "serde" ] }
sqlx = { version = '*', features = [ "postgres", "runtime-tokio-native-tls" ] }
openssl = '*'
tokio-stream = { version = '*', features = [ "sync" ] }
derive_more = '*'
rand = '*'

View File

@ -3,38 +3,82 @@
use futures::prelude::*;
use serde::Serialize;
use crate::{ Id, Activity, err, Result, Error, sign, ctx::Context };
use crate::{ Id, Activity, err, Result, Error, sign, ctx::Context, db::{ Post, Privacy } };
/// Represents the creation of an object.
///
/// Valid object types: [`Note`].
#[derive(Clone, Serialize)]
pub enum Create {
Note { object: Note }
Note {
id: Id,
actor: Id,
object: Note,
}
}
impl From<Create> for Activity { fn from (a: Create) -> Activity { Activity::Create (a) } }
impl TryFrom<Create> for Post {
type Error = Error;
fn try_from (value: Create) -> Result<Post> {
let privacy = Privacy::infer(&value);
match value {
Create::Note { id: create, object, actor } => {
let Note { content, id, .. } = object;
Ok (Post {
author: actor,
content,
privacy,
create,
id,
})
},
}
}
}
#[derive(Clone, Debug)]
pub enum Invalid {}
/// A follow request.
///
/// Valid object types: [`Actor`].
#[derive(Clone, Serialize)]
pub enum Follow {
Actor { object: Actor }
Actor {
id: Id,
object: Actor,
}
}
impl From<Follow> for Activity { fn from (a: Follow) -> Activity { Activity::Follow (a) } }
/// Signal that a [follow request][Follow] has been accepted.
///
/// Valid object types: [`Follow`].
#[derive(Clone, Serialize)]
pub enum Accept {
Follow { object: Follow }
Follow {
id: Id,
object: Follow,
}
}
impl From<Accept> for Activity { fn from (a: Accept) -> Activity { Activity::Accept (a) } }
/// An entity that publishes activities.
#[derive(Clone, Serialize, sqlx::FromRow)]
#[derive(Clone, Serialize, sqlx::FromRow, sqlx::Type)]
pub struct Actor {
id: Id,
}
/// Represents a [`Post`] for federation purposes.
#[derive(Clone, Serialize)]
pub struct Note {
id: Id,
content: Option<String>,
}
impl From<Post> for Note {
fn from (Post { id, content, .. }: Post) -> Note {
Note { id, content }
}
}
impl Activity {
@ -88,11 +132,15 @@ impl Activity {
/// Get all delivery targets as urls.
async fn delivery_targets (&self) -> Result<Vec<reqwest::Url>> {
todo!()
let unId = |Id (x)| x;
Ok (self.to().chain(self.cc()).map(unId).collect())
}
/// Perform the activity.
pub async fn perform <S> (self, ctx: &mut Context<S>) -> Result<()> where S: sign::Sign {
pub async fn perform <S> (self, _: &mut Context<S>) -> Result<()>
where
S: sign::Sign
{
todo!()
}

View File

@ -1,6 +1,6 @@
//! Hermit instance configuration.
use crate::Id;
use crate::{ Id, db };
use std::collections::HashMap as Map;
@ -15,17 +15,20 @@ pub struct Config {
pub rules: Vec<rule::Rule>,
/// Notification configuration for each local actor.
pub notify: Map<Id, Notify>,
/// Configuration related to the database.
pub db_config: db::Config,
}
impl Config {
/// Create a new default config.
pub fn new (hostname: impl ToString) -> Config {
let (notify, rules) = def();
let (notify, rules, db_config) = def();
Config {
host: hostname.to_string(),
port: 6969,
notify,
rules,
db_config,
}
}
}

View File

@ -1,20 +1,58 @@
//! Database abstraction layer used by Hermit.
use std::borrow::Borrow;
use std::{marker::PhantomData, pin::Pin};
use std::pin::Pin;
use std::sync::Arc;
use crate::ap::Note;
use crate::{ Id, Result, ap::Actor, err };
use futures::prelude::*;
use sqlx::{Executor, pool::PoolConnection, database::HasArguments};
use sqlx::{pool::PoolConnection, database::HasArguments};
use sqlx::{ FromRow, Either::Right };
use tokio::sync::{Mutex, mpsc};
use std::collections::HashSet as Set;
pub (crate) use self::data::*;
/// `const ()` but in Rust
fn void <T> (_: T) -> () { () }
pub(crate) type Database = sqlx::Postgres;
/// The type of database.
pub (crate) type Database = sqlx::Postgres;
/// Specifies how to connect to the database.
pub struct Config {}
#[derive(Clone, Debug)]
pub struct Config {
/// The host of the database server.
///
/// Defaults to `localhost`.
pub host: String,
/// The port on which the database server accepts connections.
///
/// Defaults to `5432`.
pub port: u16,
/// The user used to perform database actions on the server's behalf.
///
/// Defaults to `hermit`.
pub user: String,
/// The password used to log in to the database user.
///
/// Defaults to `hermit-does-not-imply-lonely`
pub pass: String,
}
impl Default for Config {
fn default() -> Self {
Config {
host: "localhost".into(),
user: "hermit".into(),
pass: "hermit-does-not-imply-lonely".into(),
port: 5432,
}
}
}
/// A database client.
///
@ -23,6 +61,7 @@ pub struct Config {}
pub struct Client {
/// The internal connection pool.
pool: sqlx::Pool<Database>,
reserved: Arc<Mutex<Set<Id>>>,
}
impl Client {
@ -32,11 +71,14 @@ impl Client {
todo!()
}
/// Perform a query and stream the matching rows back.
pub async fn query <'e, T: 'e> (&self, query: Query<'e, T>) -> Pin<Box<dyn Stream<Item = Result<T>> + Send + 'e>> {
let Query (q, f) = query;
let stream = q
.fetch_many(&self.pool)
.filter_map(async move |r| {
// `async move` works here because `f` is a function pointer,
// which are `Copy`.
match r.map_err(err) {
Ok (Right (row)) => Some (f(row)),
Err (error) => Some (Err (error)),
@ -47,6 +89,8 @@ impl Client {
Box::pin (stream)
}
/// Get a single item from the database. If multiple items are matched by the query,
/// this function discards all but the first one.
pub async fn get <'q, T: 'q> (&self, query: Query<'q, T>) -> Result<Option<T>> {
self.query(query)
.await
@ -69,20 +113,83 @@ impl Client {
})
.await
}
/// Determine if an [`Id`] is already taken.
pub async fn verify_unique (&self, id: Id) -> Result<Option<Id>> {
let mut r = self.reserved.lock().await;
// First check the set of reserved ids.
if r.contains(&id) {
return Ok (None)
}
// If not, determine if it is taken in the database.
let is_taken: bool = self
.with_conn(|_| async {
todo!()
})
.await?;
// Reserve the id if it is not taken. This should prevent most race
// conditions in id generation.
Ok (if is_taken {
None
} else {
r.insert(id.clone());
Some (id)
})
}
}
type Q<'a> = sqlx::query::Query<'a, Database, <Database as HasArguments<'a>>::Arguments>;
type Row = <Database as sqlx::Database>::Row;
/// Represents a query awaiting execution.
pub struct Query <'a, T> (Q<'a>, fn (Row) -> Result<T>);
impl<'a, T> Query<'a, T> {
/// Map over the inner [`Q`].
fn mapq (self, f: fn (Q<'a>) -> Q<'a>) -> Query<'a, T> {
let Query (q, g) = self;
Query (f(q), g)
}
}
/// Force user to pick between incoming relations and outgoing relations.
pub struct Choice <'a, T> {
/// The incomplete query
query: Query<'a, T>,
/// The function that completes in the case of "incoming".
i: fn (Q<'a>) -> Q<'a>,
/// The function that completes in the case of "outgoing".
o: fn (Q<'a>) -> Q<'a>,
}
impl<'a, T> Choice<'a, T> {
/// Get the relations that are "incoming".
pub fn incoming (self) -> Query<'a, T> {
self.query.mapq(self.i)
}
/// Get the relations that are "outgoing".
pub fn outgoing (self) -> Query<'a, T> {
self.query.mapq(self.o)
}
}
/// Generate a query that gets an [`Actor`] by its [`Id`].
pub fn get_actor <'a> (id: &'a Id) -> Query<'a, Actor> {
// Prepare a query
let query = sqlx::query("select * from actors where id = $1")
.bind(id);
.bind(id.borrow());
// Return an sql query which will result in a series of rows,
// and a decoder function that will translate each row to a
@ -93,3 +200,122 @@ pub fn get_actor <'a> (id: &'a Id) -> Query<'a, Actor> {
})
}
/// Construct a query that gets a [`Note`] by id.
pub fn get_note <'a> (id: &'a Id) -> Query<'a, Note> {
// Prepare a query
let query = sqlx::query("select * from posts where id = $1")
.bind(id.borrow());
// Return an sql query which will result in a series of rows,
// and a decoder function that will translate each row to a
// value of type `Actor`.
Query (query, |row: Row| {
let data = Post::from_row(&row)?.into();
Ok (data)
})
}
/// Generate a [`Choice`] of sets of follow requests. The [outgoing] set refers to
/// follow requests where the `origin` matches the given id, whereas the [incoming]
/// set refers to the requests where the `target` matches the id.
///
/// [outgoing]: Choice::outgoing
/// [incoming]: Choice::incoming
pub fn get_follow_reqs <'a> (id: &'a Id) -> Choice<'a, FollowRequest> {
// Prepare an SQL query
let query = sqlx::query("select * from followreqs where $2 = $1")
.bind(id);
// Create an inner query object
let query = Query (query, |row: Row| {
let data = FollowRequest::from_row(&row)?;
Ok (data)
});
// Return an incomplete query object, with two functions to bind
// the final parameter: `i` for the "incoming" relations and `o`
// for the "outgoing" relations.
//
// Note that closures that are `FnOnce` and do not capture from
// the environment are coercable to function pointers, which is
// what we're exploiting here to cut down on boilerplate a bit.
Choice {
i: |q| q.bind("target"),
o: |q| q.bind("origin"),
query,
}
}
pub mod data {
//! Data types stored as records in the database. Sometimes the
//! federation model and the database model don't *quite* match
//! and in that case conversions between the two are needed.
use sqlx::{ Type, FromRow };
use crate::{ Id, ap::Create };
/// Encodes the status of a follow request.
#[derive(Clone, Debug, Type)]
pub enum FollowRequestStatus {
/// The follow request was accepted.
Accepted,
/// The follow request was rejected.
Rejected,
/// The follow request has neither been accepted nor rejected.
Pending,
}
/// Represents a follow request record in the database.
#[derive(Clone, Debug, FromRow, Type)]
pub struct FollowRequest {
/// The status of the follow request (whether it is accepted, rejected
/// or still pending).
pub status: FollowRequestStatus,
/// The actor that sent the request.
pub origin: Id,
/// The actor the request was sent to.
pub target: Id,
}
/// A social media post.
#[derive(Clone, Debug, FromRow, Type)]
pub struct Post {
/// The id of the post.
pub id: Id,
/// The id of the [`Actor`] that created the post.
pub author: Id,
/// The id of the associated [`Create`] activity.
pub create: Id,
/// The content of the post.
pub content: Option<String>,
/// The post's privacy scope.
pub privacy: Privacy,
}
/// Represents a privacy scope.
#[derive(Clone, Debug, Type)]
pub enum Privacy {
/// Visible on timelines etc.
Public,
/// Called "Unlisted" on Mastodon
Hidden,
/// Called "Followers-only" on Mastodon
Locked,
/// A DM.
Direct,
}
impl Privacy {
/// Infer the privacy scope from the activity.
pub fn infer (_: &Create) -> Privacy {
todo!()
}
}
}

32
src/error.rs Normal file
View File

@ -0,0 +1,32 @@
//! Defines the [`Error`] type, [`Result`] shorthand and [`err`] utility function to convert
//! compatible errors to `Error`.
use derive_more::From;
use crate::ap;
/// A result type that defaults to using [`Error`] as the second type
/// parameter.
pub type Result <T, E = Error> = std::result::Result<T, E>;
/// Errors generated within Hermit.
#[derive(Debug, From)]
pub enum Error {
/// [`reqwest`] errors.
Http (reqwest::Error),
/// [`serde_json`] errors.
Json (serde_json::Error),
/// [`sqlx`] errors.
Sqlx (sqlx::Error),
/// A cryptography error from [`openssl`].
OpenSSL (openssl::error::ErrorStack),
/// An error with parsing a [`url`].
Url (url::ParseError),
/// An error in converting between models.
Invalid (ap::Invalid),
/// Generic "timed out" error.
Timeout,
}
/// Trivial conversion function for use in `map_err` functions.
pub (crate) fn err (e: impl Into<Error>) -> Error { e.into() }

View File

@ -1,15 +1,16 @@
#![feature(async_closure)]
#![feature(generic_associated_types)]
//! # The Hermit ActivityPub server
//!
//! This library contains the types and trait impls that make up the ActivityPub
//! support and database interaction for the Hermit ActivityPub server.
#![feature(generic_associated_types)]
use derive_more::From;
use serde::Serialize;
// Expose the `Id` type in the crate root
pub use id::Id;
pub use ctx::Context;
pub use ctx::{ Dereferencer, IdGenerator, Context };
// Module imports
pub mod conf;
@ -17,8 +18,12 @@ pub mod sign;
pub mod db;
pub mod ap;
mod error;
pub use error::{ Result, Error };
use error::err;
/// The Activity supertype used in abstractions over any kind of activity.
#[derive(Clone, Serialize)]
#[derive(Clone, Serialize, From)]
pub enum Activity {
/// Create a post.
Create (ap::Create),
@ -28,68 +33,38 @@ pub enum Activity {
Accept (ap::Accept),
}
/// A result type that defaults to using [`Error`] as the second type
/// parameter.
pub type Result <T, E = Error> = std::result::Result<T, E>;
/// Errors generated within Hermit.
#[derive(Debug)]
pub enum Error {
/// [`reqwest`] errors.
Http (reqwest::Error),
/// [`serde_json`] errors.
Json (serde_json::Error),
/// [`sqlx`] errors.
Sqlx (sqlx::Error),
/// A cryptography error from [`openssl`].
OpenSSL (openssl::error::ErrorStack),
}
impl From<sqlx::Error> for Error {
fn from (e: sqlx::Error) -> Self { Error::Sqlx (e) }
}
impl From<reqwest::Error> for Error {
fn from (e: reqwest::Error) -> Self { Error::Http (e) }
}
impl From<serde_json::Error> for Error {
fn from (e: serde_json::Error) -> Self { Error::Json (e) }
}
impl From<openssl::error::ErrorStack> for Error {
fn from (e: openssl::error::ErrorStack) -> Self { Error::OpenSSL (e) }
}
/// Trivial conversion function for use in `map_err` functions.
pub (crate) fn err (e: impl Into<Error>) -> Error { e.into() }
mod id {
use std::{str::FromStr, error::Error};
use std::str::FromStr;
use reqwest::Url;
use serde::{ Deserialize, Serialize };
use sqlx::database::{HasArguments, HasValueRef};
use url::ParseError;
use sqlx::database::{ HasArguments, HasValueRef };
use crate::db;
/// An ActivityPub identifier.
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Id (reqwest::Url);
impl FromStr for Id {
type Err = url::ParseError;
type Err = crate::Error;
fn from_str (s: &str) -> Result<Self, Self::Err> {
s.parse().map(Id)
s.parse().map(Id).map_err(crate::err)
}
}
impl Id {
pub (crate) fn new <'a> (url: &'a Url) -> Id {
Id (url.to_owned())
}
}
impl std::fmt::Display for Id {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_string())
self.0.as_str().fmt(f)
}
}
@ -116,12 +91,14 @@ mod id {
mod ctx {
use std::sync::Arc;
use std::{sync::Arc, pin::Pin};
use futures::prelude::*;
use openssl::base64;
use rand::{Rng, prelude::StdRng, SeedableRng};
use serde_json::{Value, to_value};
use crate::{ conf::Config, db, Result, sign::Sign, ap::{self, Actor}, Activity, Id, err };
use crate::{ conf::Config, db, Result, sign::Sign, ap, Activity, Id, err, Error };
/// The context of a thread/task.
///
@ -151,12 +128,13 @@ mod ctx {
impl<S> Context<S> {
/// Attempt to dereference the given json to an [`Activity`].
pub async fn dereference (&self, json: Value) -> Result<Activity>
where
S: Sign
{
self.dereferencer()
.dereference(json)
.dereference_activity(json)
.await
}
@ -191,16 +169,29 @@ mod ctx {
&self.signer
}
/// Access the current [`Config`].
pub fn config (&self) -> &Config {
&self.config
}
/// Conjure an activity "from thin air" as though it were posted through a client.
///
/// Sometimes an activity implies another activity, for example, an activity should
/// be emitted when accepting a follow request.
pub (crate) async fn conjure (&self, act: impl Into<Activity>) -> Result<()> {
let act = act.into();
todo!()
}
/// Get an [`IdGenerator`].
pub fn id_gen (&self) -> IdGenerator<'_, StdRng> {
IdGenerator {
hostname: &self.config.host,
rng: StdRng::from_entropy(),
db: self.client.clone(),
}
}
}
/// A type that provides dereferencing facilities for [`Activity`] data.
@ -214,18 +205,16 @@ mod ctx {
where
S: Sign
{
/// Perform the dereferencing.
pub async fn dereference (&self, json: Value) -> Result<Activity> {
pub async fn dereference_activity (&self, json: Value) -> Result<Activity> {
match json["type"].as_str() {
Some ("Create") => self.deref_create(json).await.map(Activity::Create),
_ => todo!()
}
}
fn db_client (&self) -> &db::Client {
&self.db
}
/// Get the inner web client.
fn web_client (&self) -> &reqwest::Client {
&self.web
}
@ -233,27 +222,37 @@ mod ctx {
/// Fetch a JSON value.
pub async fn fetch (&self, url: impl crate::IntoUrl) -> Result<Value> {
let client = self.web_client();
let url = match url.into_url() {
Some (url) => url,
None => todo!(),
};
let req = {
let mut r = client.get(url).build()?;
self.signer.sign(&mut r)?;
r
};
let id = Id::new(&url);
let value = client
.execute(req)
.await?
.json()
.await?;
if let None = self.db_fetch(id).await? {
let client = self.web_client();
let req = {
let mut r = client.get(url).build()?;
self.signer.sign(&mut r)?;
r
};
Ok (value)
let value = client
.execute(req)
.await?
.json()
.await?;
Ok (value)
} else {
// Not finding anything is not considered a hard error.
Ok (Value::Null)
}
}
/// Fetch a value from the database by trying all the ActivityPub
@ -266,6 +265,12 @@ mod ctx {
.map(Some)
}
if let Some (data) = self.db.get(db::get_note(&id)).await? {
return to_value(data)
.map_err(err)
.map(Some)
}
todo!()
}
@ -281,8 +286,61 @@ mod ctx {
_ => return Err (todo!()),
}
}
}
/// How many bytes to generate for an ID.
const ID_LEN: usize = 64;
/// Generates [`Id`]s.
pub struct IdGenerator <'h, R> {
hostname: &'h str,
db: db::Client,
rng: R,
}
impl<R> IdGenerator<'_, R>
where
R: Rng
{
/// Generate an [`Id`] with the given prefix.
pub async fn gen (&mut self, prefix: &str) -> Result<Id> {
let IdGenerator { rng, db, hostname } = self;
// Give up after 200 failed attempts.
let mut tries = 200;
loop {
// Generate a random suffix and encode it as base64.
let suffix = {
let mut buf = [0; ID_LEN];
rng.fill(&mut buf);
base64::encode_block(&buf)
.replace("=", "-")
.replace("+", "_")
};
// Create an actual `Id` with the generated garbage.
let id = format!("https://{hostname}/{prefix}/{suffix}").parse()?;
// Basic check to see if the given id exists. This reserves the id.
// If the id is already taken, it is consumed.
if let Some (id) = db.verify_unique(id).await? {
break Ok (id)
}
tries -= 1;
if tries == 0 {
// After our tries are exhausted, give up.
break Err (Error::Timeout)
}
}
}
}
}

View File

@ -1,4 +1,3 @@
use std::sync::Arc;
use hermit::{ Context, Error, db, sign, Activity, };
@ -12,7 +11,7 @@ use tokio_stream::wrappers::ReceiverStream;
mod web;
#[tokio::main]
async fn main () {
async fn main () -> Result<(), Error> {
// Set up the context for each task.
let ctx = {
@ -21,16 +20,16 @@ async fn main () {
let hostname = "dev.riley.lgbt";
// Establish a connection to the database.
let client = db::Client::new(db::Config {}).await.unwrap();
let client = db::Client::new(db::Config::default()).await?;
// Generate the config from the hostname.
let config = Config::new(&hostname);
// Use an instance-wide signing key (for now).
let signer = sign::Key::load(
format!("https://{hostname}/key/main").parse().unwrap(),
format!("https://{hostname}/key/main").parse()?,
"private_key.pem"
).map(Arc::new).unwrap();
).map(Arc::new)?;
Context {
signer,
@ -85,7 +84,9 @@ async fn main () {
ctx.run (task::Ctrl {
rx: ctrl_rx,
tx: ctrl_tx,
})
});
Ok (())
}