hermit/src/db/mod.rs

322 lines
8.4 KiB
Rust

//! Database abstraction layer used by Hermit.
use std::borrow::Borrow;
use std::pin::Pin;
use std::sync::Arc;
use crate::ap::Note;
use crate::{ Id, Result, ap::Actor, err };
use futures::prelude::*;
use sqlx::{pool::PoolConnection, database::HasArguments};
use sqlx::{ FromRow, Either::Right };
use tokio::sync::{Mutex, mpsc};
use std::collections::HashSet as Set;
pub (crate) use self::data::*;
/// `const ()` but in Rust
fn void <T> (_: T) -> () { () }
/// The type of database.
pub (crate) type Database = sqlx::Postgres;
/// Specifies how to connect to the database.
#[derive(Clone, Debug)]
pub struct Config {
/// The host of the database server.
///
/// Defaults to `localhost`.
pub host: String,
/// The port on which the database server accepts connections.
///
/// Defaults to `5432`.
pub port: u16,
/// The user used to perform database actions on the server's behalf.
///
/// Defaults to `hermit`.
pub user: String,
/// The password used to log in to the database user.
///
/// Defaults to `hermit-does-not-imply-lonely`
pub pass: String,
}
impl Default for Config {
fn default() -> Self {
Config {
host: "localhost".into(),
user: "hermit".into(),
pass: "hermit-does-not-imply-lonely".into(),
port: 5432,
}
}
}
/// A database client.
///
/// Cloning this client is cheap.
#[derive(Clone)]
pub struct Client {
/// The internal connection pool.
pool: sqlx::Pool<Database>,
reserved: Arc<Mutex<Set<Id>>>,
}
impl Client {
/// Attempt to connect to the database using the provided configuration.
pub async fn new (_: Config) -> Result<Client> {
todo!()
}
/// Perform a query and stream the matching rows back.
pub async fn query <'e, T: 'e> (&self, query: Query<'e, T>) -> Pin<Box<dyn Stream<Item = Result<T>> + Send + 'e>> {
let Query (q, f) = query;
let stream = q
.fetch_many(&self.pool)
.filter_map(async move |r| {
// `async move` works here because `f` is a function pointer,
// which are `Copy`.
match r.map_err(err) {
Ok (Right (row)) => Some (f(row)),
Err (error) => Some (Err (error)),
_ => None,
}
});
Box::pin (stream)
}
/// Get a single item from the database. If multiple items are matched by the query,
/// this function discards all but the first one.
pub async fn get <'q, T: 'q> (&self, query: Query<'q, T>) -> Result<Option<T>> {
self.query(query)
.await
.next()
.await
.transpose()
}
/// Handles the getting-a-connection logic.
async fn with_conn <F, O, T> (&self, f: F) -> Result<T>
where
F: FnOnce (&mut PoolConnection<Database>) -> O,
O: Future<Output = Result<T>>,
{
self.pool
.acquire()
.map_err(err)
.and_then(|mut c| {
f(&mut c)
})
.await
}
/// Determine if an [`Id`] is already taken.
pub async fn verify_unique (&self, id: Id) -> Result<Option<Id>> {
let mut r = self.reserved.lock().await;
// First check the set of reserved ids.
if r.contains(&id) {
return Ok (None)
}
// If not, determine if it is taken in the database.
let is_taken: bool = self
.with_conn(|_| async {
todo!()
})
.await?;
// Reserve the id if it is not taken. This should prevent most race
// conditions in id generation.
Ok (if is_taken {
None
} else {
r.insert(id.clone());
Some (id)
})
}
}
type Q<'a> = sqlx::query::Query<'a, Database, <Database as HasArguments<'a>>::Arguments>;
type Row = <Database as sqlx::Database>::Row;
/// Represents a query awaiting execution.
pub struct Query <'a, T> (Q<'a>, fn (Row) -> Result<T>);
impl<'a, T> Query<'a, T> {
/// Map over the inner [`Q`].
fn mapq (self, f: fn (Q<'a>) -> Q<'a>) -> Query<'a, T> {
let Query (q, g) = self;
Query (f(q), g)
}
}
/// Force user to pick between incoming relations and outgoing relations.
pub struct Choice <'a, T> {
/// The incomplete query
query: Query<'a, T>,
/// The function that completes in the case of "incoming".
i: fn (Q<'a>) -> Q<'a>,
/// The function that completes in the case of "outgoing".
o: fn (Q<'a>) -> Q<'a>,
}
impl<'a, T> Choice<'a, T> {
/// Get the relations that are "incoming".
pub fn incoming (self) -> Query<'a, T> {
self.query.mapq(self.i)
}
/// Get the relations that are "outgoing".
pub fn outgoing (self) -> Query<'a, T> {
self.query.mapq(self.o)
}
}
/// Generate a query that gets an [`Actor`] by its [`Id`].
pub fn get_actor <'a> (id: &'a Id) -> Query<'a, Actor> {
// Prepare a query
let query = sqlx::query("select * from actors where id = $1")
.bind(id.borrow());
// Return an sql query which will result in a series of rows,
// and a decoder function that will translate each row to a
// value of type `Actor`.
Query (query, |row: Row| {
let data = Actor::from_row(&row)?;
Ok (data)
})
}
/// Construct a query that gets a [`Note`] by id.
pub fn get_note <'a> (id: &'a Id) -> Query<'a, Note> {
// Prepare a query
let query = sqlx::query("select * from posts where id = $1")
.bind(id.borrow());
// Return an sql query which will result in a series of rows,
// and a decoder function that will translate each row to a
// value of type `Actor`.
Query (query, |row: Row| {
let data = Post::from_row(&row)?.into();
Ok (data)
})
}
/// Generate a [`Choice`] of sets of follow requests. The [outgoing] set refers to
/// follow requests where the `origin` matches the given id, whereas the [incoming]
/// set refers to the requests where the `target` matches the id.
///
/// [outgoing]: Choice::outgoing
/// [incoming]: Choice::incoming
pub fn get_follow_reqs <'a> (id: &'a Id) -> Choice<'a, FollowRequest> {
// Prepare an SQL query
let query = sqlx::query("select * from followreqs where $2 = $1")
.bind(id);
// Create an inner query object
let query = Query (query, |row: Row| {
let data = FollowRequest::from_row(&row)?;
Ok (data)
});
// Return an incomplete query object, with two functions to bind
// the final parameter: `i` for the "incoming" relations and `o`
// for the "outgoing" relations.
//
// Note that closures that are `FnOnce` and do not capture from
// the environment are coercable to function pointers, which is
// what we're exploiting here to cut down on boilerplate a bit.
Choice {
i: |q| q.bind("target"),
o: |q| q.bind("origin"),
query,
}
}
pub mod data {
//! Data types stored as records in the database. Sometimes the
//! federation model and the database model don't *quite* match
//! and in that case conversions between the two are needed.
use sqlx::{ Type, FromRow };
use crate::{ Id, ap::Create };
/// Encodes the status of a follow request.
#[derive(Clone, Debug, Type)]
pub enum FollowRequestStatus {
/// The follow request was accepted.
Accepted,
/// The follow request was rejected.
Rejected,
/// The follow request has neither been accepted nor rejected.
Pending,
}
/// Represents a follow request record in the database.
#[derive(Clone, Debug, FromRow, Type)]
pub struct FollowRequest {
/// The status of the follow request (whether it is accepted, rejected
/// or still pending).
pub status: FollowRequestStatus,
/// The actor that sent the request.
pub origin: Id,
/// The actor the request was sent to.
pub target: Id,
}
/// A social media post.
#[derive(Clone, Debug, FromRow, Type)]
pub struct Post {
/// The id of the post.
pub id: Id,
/// The id of the [`Actor`] that created the post.
pub author: Id,
/// The id of the associated [`Create`] activity.
pub create: Id,
/// The content of the post.
pub content: Option<String>,
/// The post's privacy scope.
pub privacy: Privacy,
}
/// Represents a privacy scope.
#[derive(Clone, Debug, Type)]
pub enum Privacy {
/// Visible on timelines etc.
Public,
/// Called "Unlisted" on Mastodon
Hidden,
/// Called "Followers-only" on Mastodon
Locked,
/// A DM.
Direct,
}
impl Privacy {
/// Infer the privacy scope from the activity.
pub fn infer (_: &Create) -> Privacy {
todo!()
}
}
}