Compare commits

...

5 commits

20 changed files with 2212 additions and 1100 deletions

16
Cargo.lock generated
View file

@ -885,6 +885,16 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "macro"
version = "0.0.0"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]] [[package]]
name = "matchit" name = "matchit"
version = "0.7.3" version = "0.7.3"
@ -1130,6 +1140,10 @@ dependencies = [
name = "puppy" name = "puppy"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"bincode",
"chrono",
"derive_more",
"either",
"fetch", "fetch",
"store", "store",
] ]
@ -1490,6 +1504,8 @@ dependencies = [
"bincode", "bincode",
"chrono", "chrono",
"derive_more", "derive_more",
"either",
"macro",
"rocksdb", "rocksdb",
"tempfile", "tempfile",
"ulid", "ulid",

View file

@ -3,6 +3,7 @@ members = [
"lib/puppy", "lib/puppy",
"lib/store", "lib/store",
"lib/fetch", "lib/fetch",
"lib/macro",
"bin/server", "bin/server",
"bin/pupctl", "bin/pupctl",
] ]

View file

@ -1,79 +1,76 @@
use puppy::{ use puppy::{
store::{ model::{schema, Bite, FollowRequest, Follows, Profile, Username},
self, post::Author,
alias::Username, store::{self, Error},
arrow::{FollowRequested, Follows}, Key, Store,
mixin::Profile,
Error,
},
tl::Post,
Bite, Key, Store,
}; };
fn main() -> store::Result<()> { fn main() -> store::Result<()> {
// Store::nuke(".state")?; // Store::nuke(".state")?;
let db = Store::open(".state")?; let db = Store::open(".state", schema())?;
println!("creating actors"); println!("creating actors");
let riley = get_or_create_actor(&db, "riley")?; let riley = get_or_create_actor(&db, "riley")?;
let linen = get_or_create_actor(&db, "linen")?; let linen = get_or_create_actor(&db, "linen")?;
if false { if true {
println!("creating posts"); println!("creating posts");
puppy::create_post(&db, riley, "@linen <3")?; puppy::post::create_post(&db, riley, "@linen <3")?;
puppy::create_post(&db, linen, "@riley <3")?; puppy::post::create_post(&db, linen, "@riley <3")?;
} }
if false {
if true {
println!("making riley follow linen"); println!("making riley follow linen");
if !db.exists::<Follows>((riley, linen))? { if !db.exists::<Follows>(riley, linen)? {
println!("follow relation does not exist yet"); println!("follow relation does not exist yet");
if !db.exists::<FollowRequested>((riley, linen))? { if !db.exists::<FollowRequest>(riley, linen)? {
println!("no pending follow request; creating"); println!("no pending follow request; creating");
puppy::fr::create(&db, riley, linen)?; puppy::follows::request(&db, riley, linen)?;
} else { } else {
println!("accepting the pending follow request"); println!("accepting the pending follow request");
puppy::fr::accept(&db, riley, linen)?; puppy::follows::accept(&db, riley, linen)?;
} }
} else { } else {
println!("riley already follows linen"); println!("riley already follows linen");
} }
} }
println!("Posts on the instance:");
for Post { println!("\nPosts on the instance:");
id, for post in puppy::post::fetch_timeline(&db, .., None)?.posts() {
content, let Author { ref handle, .. } = post.author;
author, let content = post.content.content.as_ref().unwrap();
} in puppy::tl::fetch_all(&db)? println!("- {} by {handle}:\n{content}", post.id)
{
let (_, Profile { account_name, .. }) = db.lookup(author)?;
let content = content.content.unwrap();
println!("- {id} by @{account_name} ({author}):\n{content}",)
} }
println!("Linen's followers:");
for id in puppy::fr::followers_of(&db, linen)? { println!("\nLinen's followers:");
let (_, Profile { account_name, .. }) = db.lookup(id)?; for id in puppy::follows::followers_of(&db, linen)? {
let Profile { account_name, .. } = db.get_mixin(id)?.unwrap();
println!("- @{account_name} ({id})"); println!("- @{account_name} ({id})");
} }
println!("Riley's following:");
for id in puppy::fr::following_of(&db, riley)? { println!("\nRiley's following:");
let (_, Profile { account_name, .. }) = db.lookup(id)?; for id in puppy::follows::following_of(&db, riley)? {
let Profile { account_name, .. } = db.get_mixin(id)?.unwrap();
println!("- @{account_name} ({id})"); println!("- @{account_name} ({id})");
} }
println!("Biting riley");
puppy::bite_actor(&db, linen, riley).unwrap(); if false {
for Bite { id, biter, .. } in puppy::bites_on(&db, riley).unwrap() { println!("Biting riley");
let (_, Profile { account_name, .. }) = db.lookup(biter).unwrap(); puppy::bites::bite_actor(&db, linen, riley).unwrap();
println!("riley was bitten by @{account_name} at {}", id.timestamp()); for Bite { id, biter, .. } in puppy::bites::bites_on(&db, riley).unwrap() {
let Profile { account_name, .. } = db.get_mixin(biter)?.unwrap();
println!("riley was bitten by @{account_name} at {}", id.timestamp());
}
} }
store::OK store::OK
} }
fn get_or_create_actor(db: &Store, username: &str) -> Result<Key, Error> { fn get_or_create_actor(db: &Store, username: &str) -> Result<Key, Error> {
let user = db.translate::<Username>(username); let user = db.lookup(Username(username.to_string()));
match user { match user {
Ok(key) => { Ok(Some(key)) => {
println!("found '{username}' ({key})"); println!("found '{username}' ({key})");
Ok(key) Ok(key)
} }
Err(Error::Missing) => { Ok(None) => {
println!("'{username}' doesn't exist yet, creating"); println!("'{username}' doesn't exist yet, creating");
let r = puppy::create_actor(&db, username); let r = puppy::create_actor(&db, username);
if let Ok(ref key) = r { if let Ok(ref key) = r {

13
lib/macro/Cargo.toml Normal file
View file

@ -0,0 +1,13 @@
[package]
name = "macro"
edition = "2021"
[lib]
path = "src/lib.rs"
proc-macro = true
[dependencies]
syn = { version = '2', features = ['full'] }
quote = '*'
proc-macro2 = '*'
heck = '*'

142
lib/macro/src/arrow.rs Normal file
View file

@ -0,0 +1,142 @@
use heck::AsKebabCase;
use proc_macro::TokenStream;
use quote::{quote, ToTokens};
use syn::{parse_macro_input, Data, DeriveInput, Field, Ident};
pub fn arrow(item: TokenStream) -> TokenStream {
let input = parse_macro_input!(item as DeriveInput);
let Data::Struct(structure) = input.data else {
panic!("Only structs are supported as arrows")
};
match structure.fields {
syn::Fields::Named(fields) => from_named(&input.ident, fields),
syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => {
let first = f.unnamed.first().unwrap();
from_newtype(&input.ident, first)
}
_ => panic!(
"Only newtype structs and structs with named fields can have a derived arrow impl"
),
}
}
fn from_named(name: &Ident, fields: syn::FieldsNamed) -> TokenStream {
let (origin, target, identity) = extract_idents(fields);
match identity {
Some(id) => make_multi_arrow(name, origin, target, id),
None => make_basic_arrow(name, origin, target),
}
}
fn make_basic_arrow(name: &Ident, origin: Ident, target: Ident) -> TokenStream {
let spec = gen_spec(name);
TokenStream::from(quote! {
#spec
impl store::arrow::Arrow for #name {}
impl From<store::arrow::Basic> for #name {
fn from(v: store::arrow::Basic) -> #name {
#name {
#origin: v.origin,
#target: v.target,
}
}
}
impl From<#name> for store::arrow::Basic {
fn from(v: #name) -> store::arrow::Basic {
store::arrow::Basic {
origin: v.#origin,
target: v.#target,
}
}
}
})
}
fn make_multi_arrow(name: &Ident, origin: Ident, target: Ident, id: Ident) -> TokenStream {
let spec = gen_spec(name);
TokenStream::from(quote! {
#spec
impl store::arrow::Arrow for #name {
type Kind = store::arrow::Multi;
}
impl From<store::arrow::Multi> for #name {
fn from(v: store::arrow::Multi) -> #name {
#name {
#id: v.identity,
#origin: v.origin,
#target: v.target,
}
}
}
impl From<#name> for store::arrow::Multi {
fn from(v: #name) -> store::arrow::Multi {
store::arrow::Multi {
identity: v.#id,
origin: v.#origin,
target: v.#target,
}
}
}
})
}
fn extract_idents(fields: syn::FieldsNamed) -> (Ident, Ident, Option<Ident>) {
let origin = extract_ident("origin", &fields).unwrap();
let target = extract_ident("target", &fields).unwrap();
let id = extract_ident("identity", &fields);
(origin, target, id)
}
fn extract_ident(name: &str, fields: &syn::FieldsNamed) -> Option<Ident> {
// Prefer marked fields and default to correctly named fields.
fields
.named
.iter()
.find(|field| {
field
.attrs
.iter()
.filter_map(|attr| attr.meta.path().get_ident())
.any(|id| id == name)
})
.and_then(|f| f.ident.clone())
.or_else(|| {
fields
.named
.iter()
.filter_map(|f| f.ident.clone())
.find(|id| id == name)
})
}
fn gen_spec(name: &Ident) -> impl ToTokens {
let prefix = AsKebabCase(name.to_string());
let by_origin = format!("{prefix}/by-origin");
let by_target = format!("{prefix}/by-target");
quote! {
impl store::types::DataType for #name {
type Type = store::types::ArrowSpec;
const SPEC: Self::Type = store::types::ArrowSpec {
by_origin: store::types::Keyspace(#by_origin),
by_target: store::types::Keyspace(#by_target),
};
}
}
}
fn from_newtype(name: &Ident, field: &Field) -> TokenStream {
let spec = gen_spec(name);
let typ = &field.ty;
TokenStream::from(quote! {
#spec
impl store::arrow::Arrow for #name {
type Kind = #typ;
}
impl From<#typ> for #name {
fn from(v: #typ) -> #name { #name(v) }
}
impl From<#name> for #typ {
fn from(v: #name) -> #typ { v.0 }
}
})
}

73
lib/macro/src/lib.rs Normal file
View file

@ -0,0 +1,73 @@
use proc_macro::TokenStream;
mod arrow;
#[proc_macro_derive(Arrow, attributes(origin, target, identity))]
pub fn arrow(item: TokenStream) -> TokenStream {
arrow::arrow(item)
}
#[proc_macro_derive(Alias)]
pub fn alias(item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::DeriveInput);
let syn::Data::Struct(structure) = input.data else {
panic!("Only structs are supported as aliases")
};
match structure.fields {
syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => {
let first = f.unnamed.first().unwrap();
make_alias_impl(&input.ident, first)
}
_ => panic!("Only string newtype structs are allowed as aliases"),
}
}
fn make_alias_impl(name: &syn::Ident, field: &syn::Field) -> TokenStream {
let typ = &field.ty;
let prefix = heck::AsKebabCase(name.to_string());
let keyspace = format!("{prefix}/keyspace");
let reversed = format!("{prefix}/reversed");
let spec = quote::quote! {
impl store::types::DataType for #name {
type Type = store::types::AliasSpec;
const SPEC: Self::Type = store::types::AliasSpec {
keyspace: store::types::Keyspace(#keyspace),
reversed: store::types::Keyspace(#reversed),
};
}
};
TokenStream::from(quote::quote! {
#spec
impl store::Alias for #name {}
impl AsRef<str> for #name {
fn as_ref(&self) -> &str { self.0.as_ref() }
}
impl From<#typ> for #name {
fn from(v: #typ) -> #name { #name(v) }
}
})
}
#[proc_macro_derive(Mixin)]
pub fn mixin(item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::DeriveInput);
let name = input.ident;
let prefix = heck::AsKebabCase(name.to_string());
let keyspace = format!("{prefix}/main");
let spec = quote::quote! {
impl store::types::DataType for #name {
type Type = store::types::MixinSpec;
const SPEC: Self::Type = store::types::MixinSpec {
keyspace: store::types::Keyspace(#keyspace),
};
}
};
TokenStream::from(quote::quote! {
#spec
impl store::Mixin for #name {}
})
}

View file

@ -8,3 +8,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
store = { path = "../store" } store = { path = "../store" }
fetch = { path = "../fetch" } fetch = { path = "../fetch" }
bincode = "2.0.0-rc.3"
chrono = "*"
either = "*"
derive_more = "*"

View file

@ -1,48 +1,65 @@
#![feature(iterator_try_collect)] #![feature(iterator_try_collect, try_blocks)]
use model::{Profile, Username};
pub use store::{self, Key, Store}; pub use store::{self, Key, Store};
use store::{
alias::Username,
arrow::{self, multi::MultiArrow, AuthorOf},
mixin::{Content, Profile},
util::IterExt,
Keylike, Tag,
};
mod tags { pub mod model {
//! Type tags for vertices. use bincode::{Decode, Encode};
use derive_more::Display;
use store::{types::Schema, Alias, Mixin};
use store::Tag; use crate::follows::Status;
pub use crate::{
bites::Bite,
follows::{FollowRequest, Follows},
post::{AuthorOf, Content},
};
pub const ACTOR: Tag = Tag(0); /// A "profile" in the social media sense.
pub const POST: Tag = Tag(1); ///
pub const BITE: Tag = Tag(2); /// Contains all presentation information about someone making posts.
} #[derive(Mixin, Encode, Decode, Debug, Clone)]
pub struct Profile {
/// How many posts has this user made?
pub post_count: usize,
/// The name used for the profile's handle.
pub account_name: Username,
/// The name displayed above their posts.
pub display_name: Option<String>,
/// The "bio", a freeform "about me" field.
pub about_string: Option<String>,
/// Arbitrary custom metadata fields.
pub about_fields: Vec<(String, String)>,
}
pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result<Key> { /// A unique name for an actor that is part of their "handle".
let key = Key::gen(); #[derive(Alias, Encode, Decode, Clone, PartialEq, Eq, Debug, Hash, Display)]
db.transaction(|tx| { pub struct Username(pub String);
tx.create_vertex(key, tags::POST)?;
tx.update::<Profile>(author, |_, mut profile| { /// Construct the schema.
profile.post_count += 1; pub fn schema() -> Schema {
Ok(profile) Schema::new()
})?; // Mixins
tx.insert(key, Content { .has::<Profile>()
content: Some(content.to_string()), .has::<Content>()
summary: None, .has::<Status>()
})?; // Aliases
tx.insert_arrow((author, key), AuthorOf)?; .has::<Username>()
Ok(key) // Arrows
}) .has::<Bite>()
.has::<FollowRequest>()
.has::<AuthorOf>()
.has::<Follows>()
}
} }
pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> { pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
let key = Key::gen(); let key = Key::gen();
db.transaction(|tx| { db.run(|tx| {
tx.create_vertex(key, tags::ACTOR)?; let username: Username = username.to_string().into();
tx.insert_alias(key, Username(username.to_string()))?; tx.add_alias(key, username.clone())?;
tx.insert(key, Profile { tx.add_mixin(key, Profile {
post_count: 0, post_count: 0,
account_name: username.to_string(), account_name: username,
display_name: None, display_name: None,
about_string: None, about_string: None,
about_fields: Vec::new(), about_fields: Vec::new(),
@ -51,126 +68,369 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
}) })
} }
pub fn list_posts_by_author( pub mod bites {
db: &Store, //! The most essential feature of any social network.
author: impl Keylike,
) -> store::Result<Vec<(Key, Content)>> {
db.transaction(|tx| {
tx.list_outgoing::<AuthorOf>(author)
.bind_results(|(post_key, _)| tx.lookup::<Content>(post_key))
.collect()
})
}
pub struct Bite { use store::{Arrow, Key, Store};
pub id: Key,
pub biter: Key,
pub victim: Key,
}
impl MultiArrow for Bite { /// *Bites you*
const TYPE: Tag = tags::BITE; #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
} pub struct Bite {
#[identity]
pub fn bite_actor(db: &Store, biter: Key, victim: Key) -> store::Result<Key> {
db.transaction(|tx| {
// Bites are represented as multiedges.
let key = arrow::multi::insert::<Bite>(&tx, biter, victim)?;
// We can treat particular arrows in a quiver as a vertex by registering it.
tx.create_vertex(key, tags::BITE)?;
Ok(key)
})
}
pub fn bites_on(db: &Store, victim: Key) -> store::Result<Vec<Bite>> {
db.transaction(|tx| {
arrow::multi::list_incoming::<Bite>(&tx, victim)
.map_ok(|(biter, id)| Bite { id, biter, victim })
.try_collect()
})
}
pub mod tl {
//! Timelines
use store::{arrow::AuthorOf, mixin::Content, util::IterExt as _, Error, Key, Result, Store};
pub struct Post {
pub id: Key, pub id: Key,
pub author: Key, #[origin]
pub content: Content, pub biter: Key,
#[target]
pub victim: Key,
} }
pub fn fetch_all(db: &Store) -> Result<Vec<Post>> { pub fn bite_actor(db: &Store, biter: Key, victim: Key) -> store::Result<Key> {
db.transaction(|tx| { db.run(|tx| {
let iter = tx.list::<Content>(); let id = Key::gen();
tx.create(Bite { id, biter, victim })?;
Ok(id)
})
}
/// Who has bitten `victim`?
pub fn bites_on(db: &Store, victim: Key) -> store::Result<Vec<Bite>> {
db.incoming::<Bite>(victim).try_collect()
}
}
pub mod post {
//! Timelines: where you go to view the posts.
use std::ops::RangeBounds;
use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use either::Either::{Left, Right};
use store::{util::IterExt as _, Arrow, Error, Key, Mixin, Result, Store, Transaction};
use crate::model::Profile;
/// The contents of a post.
#[derive(Mixin, Encode, Decode, Debug, Clone, Default)]
pub struct Content {
/// Main post body.
pub content: Option<String>,
/// Content warning for the post.
pub warning: Option<String>,
}
impl From<&str> for Content {
fn from(value: &str) -> Self {
value.to_string().into()
}
}
impl From<String> for Content {
fn from(value: String) -> Self {
Content {
content: Some(value),
warning: None,
}
}
}
/// The relation that `author` has constructed and published `object`.
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct AuthorOf {
#[origin]
pub author: Key,
#[target]
pub object: Key,
}
/// A piece of content posted by someone.
#[derive(Clone, Debug)]
pub struct Post {
/// The post's internal ID.
pub id: Key,
/// The actual post contents.
pub content: Content,
/// Metadata about the post's author.
pub author: Author,
}
/// Information about a [`Post`]'s author.
#[derive(Clone, Debug)]
pub struct Author {
/// The identifier of the author.
pub id: Key,
/// The name to display along with the post.
pub display_name: String,
/// An informal identifier for a particular author.
pub handle: String,
}
/// An ordered list of [`Post`]s for viewing.
#[derive(Debug)]
pub struct Timeline {
items: Vec<Item>,
}
/// Discrete events that can be displayed to a user as part of a timeline.
#[derive(Debug)]
enum Item {
Post(Post),
}
impl Item {
/// Get the timeline item if it is a [`Post`].
pub fn as_post(&self) -> Option<&Post> {
match self {
Item::Post(ref post) => Some(post),
}
}
}
impl Timeline {
/// Get all the posts in the timeline.
pub fn posts(&self) -> impl Iterator<Item = &Post> {
self.items.iter().filter_map(|x| x.as_post())
}
}
/// Gets at most `limit` of the posts known to the instance that were inserted within `time_range`.
pub fn fetch_timeline(
db: &Store,
time_range: impl RangeBounds<DateTime<Utc>>,
limit: Option<usize>,
) -> Result<Timeline> {
let posts = db.run(|tx| {
// Get all post content entries (the argument passed here is a range of chrono datetimes).
let iter = tx.range::<Content>(time_range);
let iter = match limit {
Some(n) => Left(iter.take(n)),
None => Right(iter),
};
// Then, we're gonna map each of them to their author, and get the profile information needed to
// render the post (mostly display name and handle).
iter.bind_results(|(id, content)| { iter.bind_results(|(id, content)| {
let author = tx // Take the first author. There is nothing stopping a post from having multiple authors, but
.list_incoming::<AuthorOf>(id) // let's take it one step at a time.
.keys() let (author, Some(Profile { display_name, account_name, .. })) = tx
.next_or(Error::Missing)?; .join_on(|a: AuthorOf| a.author, tx.incoming(id))?
Ok(Post { .swap_remove(0)
else {
// We expect all posts to have at least one author, so we should complain if there is one
// that doesn't (for now). For robustness, the `.collect()` down there should be replaced
// with a strategy where we log a warning instead of failing, but in the current state of
// the project, failing fast is a good thing.
return Err(Error::Missing);
};
Ok(Item::Post(Post {
id, id,
author, author: Author {
id: author,
handle: format!("@{account_name}"),
display_name: display_name.unwrap_or(account_name.0),
},
content, content,
}) }))
}) })
.collect() .collect()
})?;
Ok(Timeline { items: posts })
}
/// Create a new post.
pub fn create_post(db: &Store, author: Key, content: impl Into<Content>) -> store::Result<Key> {
db.run(|tx| mixin_post(tx, Key::gen(), author, content))
}
/// Add a post's mixins and predicates to an existing `node`.
pub fn mixin_post(
tx: &Transaction<'_>,
node: Key,
author: Key,
content: impl Into<Content>,
) -> store::Result<Key> {
tx.update::<Profile>(author, |mut profile| {
profile.post_count += 1;
profile
})?;
tx.add_mixin(node, content.into())?;
tx.create(AuthorOf { author, object: node })?;
Ok(node)
}
pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result<Vec<(Key, Content)>> {
db.run(|tx| {
let posts = tx
.join_on(|a: AuthorOf| a.object, tx.outgoing(author))?
.into_iter()
.filter_map(|(k, opt)| try { (k, opt?) })
.collect();
Ok(posts)
}) })
} }
} }
pub mod fr { pub mod follows {
//! Follow requests //! Follow requests and related stuff.
use store::{ use bincode::{Decode, Encode};
arrow::{FollowRequested, Follows}, use store::{util::IterExt, Arrow, Error, Key, Mixin, Store, OK};
util::IterExt as _,
Key, Store, OK,
};
pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> { /// A predicate; `follower` "follows" `followed`.
db.transaction(|tx| { #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
tx.insert_arrow((requester, target), FollowRequested)?; pub struct Follows {
OK #[origin]
pub follower: Key,
#[target]
pub followed: Key,
}
/// An instance of a request from some `origin` user to follow a `target` user.
///
/// This should not be used to determine whether two actors are following each other. For that, use
/// [`Follows`], a basic arrow for exactly this purpose. *This* arrow is used to identify specific
/// instances of *requests*, and serves mostly as a historical reference and for synchronizing with
/// other servers.
///
/// Mixins always present for the `id`:
///
/// - [`Status`], carrying the status of the request.
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct FollowRequest {
/// The unique ID of this particular request.
#[identity]
pub id: Key,
/// The "follower", the user that made the request.
pub origin: Key,
/// The one the request is made to.
pub target: Key,
}
impl FollowRequest {
/// Determine if this follow request is pending.
pub fn is_pending(&self, db: &Store) -> store::Result<bool> {
// The status is stored as a mixin, so we need to get it.
let Some(st) = db.get_mixin::<Status>(self.id)? else {
// If we don't have a status for a follow request, something is borked.
return Err(Error::Missing);
};
// If the status of the follow request is pending, it can't also be true that the follows
// relation already exists.
debug_assert! {
!(st == Status::Pending)
|| db.exists::<Follows>(self.origin, self.target).map(|x| !x)?,
"fr.is_pending -> !(fr.origin follows fr.target)"
};
Ok(st == Status::Pending)
}
}
/// The status of a [`FollowRequest`].
///
/// Valid state transitions:
///
/// ```text
/// ┌──────────────▶ Rejected
/// │
/// │
/// │
///
/// None ─────────▶ Pending ────────▶ Accepted
///
/// │ │
/// │ │
/// │ │
/// ▼ │
/// Withdrawn ◀────────────┘
/// ```
///
/// In addition, a follow request will be deleted if either endpoint is removed from the graph.
#[derive(Mixin, Encode, Decode, Eq, PartialEq, Clone)]
pub enum Status {
/// The follow request was previously pending or accepted, but since withdrawn.
///
/// This can happen when someone cancels their follow request or unfollows the target.
Withdrawn,
/// The follow request was accepted.
Accepted,
/// The follow request was denied.
Rejected,
/// The follow request is still under review.
Pending,
}
/// Request to follow another actor.
pub fn request(db: &Store, requester: Key, target: Key) -> store::Result<FollowRequest> {
db.run(|tx| {
let req = FollowRequest {
id: Key::gen(),
origin: requester,
target,
};
tx.create(req)?;
tx.add_mixin(req.id, Status::Pending)?;
Ok(req)
}) })
} }
/// Accept the open follow request from `requester` to `target`, if one exists.
pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> { pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| { db.run(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?; // TODO: This logic is a little broken but it'll do for now. i'll fix it later.
tx.insert_arrow((requester, target), Follows)?; let fr = tx
.between::<FollowRequest>(requester, target)
.filter(|fr| fr.as_ref().is_ok_and(|f| f.target == target))
// We'll want the latest one, because that one was inserted last so it'll be the most
// recent
.last()
.ok_or_else(|| Error::Missing)??;
// Only apply the update if the last follow request is still in a pending state.
if let Some(Status::Pending) = db.get_mixin(fr.id)? {
tx.update(fr.id, |_| Status::Accepted)?;
tx.create(Follows {
follower: requester,
followed: target,
})?;
}
OK OK
}) })
} }
pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> { pub fn reject(db: &Store, request: Key) -> store::Result<()> {
db.transaction(|tx| { db.run(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?; tx.update(request, |_| Status::Rejected)?;
OK OK
}) })
} }
pub fn list_pending(db: &Store, target: Key) -> store::Result<Vec<Key>> { /// List all pending follow requests for a user.
db.transaction(|tx| tx.list_incoming::<FollowRequested>(target).keys().collect()) pub fn list_pending(db: &Store, target: Key) -> store::Result<Vec<FollowRequest>> {
db.incoming::<FollowRequest>(target)
.filter_bind_results(|req| Ok(if req.is_pending(db)? { Some(req) } else { None }))
.collect()
} }
/// Get all actors followed by `actor`.
pub fn following_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> { pub fn following_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| tx.list_outgoing::<Follows>(actor).keys().collect()) db.outgoing::<Follows>(actor)
.map_ok(|a| a.followed)
.collect()
} }
/// Get all actors following `actor`.
pub fn followers_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> { pub fn followers_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| tx.list_incoming::<Follows>(actor).keys().collect()) db.incoming::<Follows>(actor)
.map_ok(|a| a.follower)
.collect()
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use store::{ use store::{Key, Store, OK};
arrow::{FollowRequested, Follows},
Key, Store, OK,
};
use crate::create_actor; use crate::{
create_actor,
model::{schema, FollowRequest, Follows},
};
fn make_test_actors(db: &Store) -> store::Result<(Key, Key)> { fn make_test_actors(db: &Store) -> store::Result<(Key, Key)> {
let alice = create_actor(&db, "alice")?; let alice = create_actor(&db, "alice")?;
@ -181,18 +441,21 @@ pub mod fr {
#[test] #[test]
fn create_fr() -> store::Result<()> { fn create_fr() -> store::Result<()> {
Store::with_tmp(|db| { Store::test(schema(), |db| {
let (alice, bob) = make_test_actors(&db)?; let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?; super::request(&db, alice, bob)?;
assert!( assert!(
db.exists::<FollowRequested>((alice, bob))?, db.exists::<FollowRequest>(alice, bob)?,
"(alice -> bob) ∈ follow-requested" "(alice -> bob) ∈ follow-requested"
); );
assert!( assert!(
!db.exists::<Follows>((alice, bob))?, !db.exists::<Follows>(alice, bob)?,
"(alice -> bob) ∉ follows" "(alice -> bob) ∉ follows"
); );
let pending_for_bob = super::list_pending(&db, bob)?; let pending_for_bob = super::list_pending(&db, bob)?
.into_iter()
.map(|fr| fr.origin)
.collect::<Vec<_>>();
assert_eq!(pending_for_bob, vec![alice], "bob.pending = {{alice}}"); assert_eq!(pending_for_bob, vec![alice], "bob.pending = {{alice}}");
OK OK
}) })
@ -200,17 +463,17 @@ pub mod fr {
#[test] #[test]
fn accept_fr() -> store::Result<()> { fn accept_fr() -> store::Result<()> {
Store::with_tmp(|db| { Store::test(schema(), |db| {
let (alice, bob) = make_test_actors(&db)?; let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?; super::request(&db, alice, bob)?;
super::accept(&db, alice, bob)?; super::accept(&db, alice, bob)?;
assert!( assert!(
db.exists::<Follows>((alice, bob))?, db.exists::<Follows>(alice, bob)?,
"(alice -> bob) ∈ follows" "(alice -> bob) ∈ follows"
); );
assert!( assert!(
!db.exists::<Follows>((bob, alice))?, !db.exists::<Follows>(bob, alice)?,
"(bob -> alice) ∉ follows" "(bob -> alice) ∉ follows"
); );
@ -226,9 +489,9 @@ pub mod fr {
#[test] #[test]
fn listing_follow_relations() -> store::Result<()> { fn listing_follow_relations() -> store::Result<()> {
Store::with_tmp(|db| { Store::test(schema(), |db| {
let (alice, bob) = make_test_actors(&db)?; let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?; super::request(&db, alice, bob)?;
super::accept(&db, alice, bob)?; super::accept(&db, alice, bob)?;
let followers_of_bob = super::followers_of(&db, bob)?; let followers_of_bob = super::followers_of(&db, bob)?;

View file

@ -12,3 +12,5 @@ derive_more = "*"
bincode = "2.0.0-rc.3" bincode = "2.0.0-rc.3"
chrono = "*" chrono = "*"
tempfile = "*" tempfile = "*"
macro = { path = "../macro" }
either = "*"

View file

@ -1,17 +1,115 @@
//! Alternative keys. /// Derive an implementation of [`Alias`].
pub use r#macro::Alias;
use derive_more::{Display, From}; use super::{
types::{AliasSpec, DataType},
Batch, Store, Transaction,
};
use crate::{Key, Result};
use crate::Space; /// An alternative unique identifier for a node.
pub trait Alias: DataType<Type = AliasSpec> + From<String> + AsRef<str> {}
/// An alternative unique key for a vertex. impl Transaction<'_> {
pub trait Alias: ToString + From<String> { /// Look up the key associated with the alias.
const SPACE: (Space, Space); pub fn lookup<A>(&self, alias: A) -> Result<Option<Key>>
where
A: Alias,
{
op::lookup::<A>(self, alias.as_ref())
}
/// Get the alias associated with the `node`.
pub fn get_alias<A>(&self, node: Key) -> Result<Option<A>>
where
A: Alias,
{
op::get_alias(self, node)
}
/// Add an alias to `node`.
pub fn add_alias<A>(&self, node: Key, alias: A) -> Result<()>
where
A: Alias,
{
op::add_alias::<A>(self, node, alias.as_ref())
}
/// Check whether `node` has an `M` defined for it.
pub fn has_alias<A>(&self, node: Key) -> Result<bool>
where
A: Alias,
{
op::has_alias::<A>(self, node)
}
} }
#[derive(Display, From)] impl Store {
pub struct Username(pub String); /// Look up the key associated with the alias.
pub fn lookup<A>(&self, alias: A) -> Result<Option<Key>>
impl Alias for Username { where
const SPACE: (Space, Space) = (Space("username/l"), Space("username/r")); A: Alias,
{
op::lookup::<A>(self, alias.as_ref())
}
/// Get the alias associated with the `node`.
pub fn get_alias<A>(&self, node: Key) -> Result<Option<A>>
where
A: Alias,
{
op::get_alias(self, node)
}
/// Check whether `node` has an `M` defined for it.
pub fn has_alias<A>(&self, node: Key) -> Result<bool>
where
A: Alias,
{
op::has_alias::<A>(self, node)
}
}
impl Batch {
/// Add an alias to `node`.
///
/// # Warning
///
/// This will *not* fail if the key already has a alias of this type, and in fact *it will cause fundamental inconsistency*
/// if the alias already exists. Don't call this function unless you know that neither `node` nor `alias` exist yet.
pub fn put_alias<A>(&mut self, node: Key, alias: A)
where
A: Alias,
{
// TODO: consistency *could* be checked by manually iterating over the transaction using `WriteBatch::iterate`
op::add_alias::<A>(self, node, alias.as_ref()).unwrap();
}
}
mod op {
use crate::{internal::*, Alias, Key, Result, OK};
pub fn lookup<A: Alias>(cx: &impl Query, alias: &str) -> Result<Option<Key>> {
cx.open(A::SPEC.keyspace).get(alias).map(|k| match k {
Some(x) => Some(Key::from_slice(x.as_ref())),
None => None,
})
}
pub fn has_alias<A: Alias>(cx: &impl Query, node: Key) -> Result<bool> {
cx.open(A::SPEC.reversed).has(node)
}
pub fn add_alias<A: Alias>(cx: &impl Write, node: Key, alias: &str) -> Result<()> {
cx.open(A::SPEC.keyspace).set(alias, node)?;
cx.open(A::SPEC.reversed).set(node, alias)?;
OK
}
pub fn get_alias<A: Alias>(cx: &impl Query, node: Key) -> Result<Option<A>> {
let buf = cx.open(A::SPEC.reversed).get(node)?;
Ok(buf.map(decode))
}
fn decode<T>(data: impl AsRef<[u8]>) -> T
where
T: From<String>,
{
T::from(String::from_utf8_lossy(data.as_ref()).into_owned())
}
} }

View file

@ -1,79 +1,583 @@
//! Relations between nodes. //! Directed edges, both parallel and simple.
//!
//! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`].
//!
//! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`],
//! and manipulating them can likewise be done from within the context of a `Transaction`.
//!
//! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a
//! few tricks to do with associated constants and types to make it all work nicely.
//!
//! # Terminology
//!
//! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes
//! can be seen as "things", and edges as connections between those things, defined by the two nodes that
//! they connect (which are called the *endpoints* of the edge).
//!
//! These edges can be directed or undirected. The difference is that undirected edges are identified by
//! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified
//! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and
//! the other is the *head* (usually called *target* here).
//!
//! # Arrow kinds
//!
//! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined
//! solely by which two nodes they connect, which means that their representation and certain operations
//! are more efficient. The trade-off is that they cannot capture more complex information than "this
//! edge exists".
//!
//! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple,
//! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they
//! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key,
//! they incur more space overhead, and most operations on them are more expensive compared to basic
//! edges.
//!
//! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which
//! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be
//! discernable from each other, each of them also has an `identity` key, in addition to listing the two
//! edges they connect.
//!
//! [mixins]: super::Mixin
#![allow(private_interfaces)]
use bincode::{Decode, Encode}; pub use self::kinds::{Basic, Multi};
use super::{
types::{ArrowSpec, DataType},
Batch, Store, Transaction,
};
use crate::{util::IterExt as _, Key, Result};
use crate::Space; /// A directed edge.
///
/// See the [module docs][self] for an introduction.
pub trait Arrow: DataType<Type = ArrowSpec> + From<Self::Kind> + Into<Self::Kind> {
/// The representation of this arrow, which also determines whether parallel edges are allowed.
type Kind: ArrowKind = Basic;
}
pub mod multi { /// Parameterizing arrows so we can distinguish between kinds of arrows.
//! Managing multiedges. ///
//! /// This lets us present a common API for certain arrow-related operations while also leveraging some
//! Unlike regular [`Arrow`]s, which don't have an identity (they are identified by the two nodes that /// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at
//! they connect), multiarrows can have their own [`Key`]. This allows one to have multiple arrows in /// the type level and at the value level whether that arrow is a multi-arrow or not.
//! the same direction connecting the same two vertices, which isn't possible with normal arrows. pub trait ArrowKind {
//! /// Whether this kind of arrow should be represented using the specialized representation for edges
//! Multiarrows can also be treated as if they were vertices, if their identity (`Key`) is registered as /// that are allowed to be parallel.
//! one. const IS_MULTI: bool;
//! /// Construct an arrow from a buffer containing a correctly-oriented arrow.
//! This comes with a trade-off, though, specifically in both space and complexity. A multi-arrow also ///
//! can't have a label, like a typical arrow. /// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently
/// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the
/// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively.
///
/// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target.
/// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity",
/// which is needed to discriminate between multiple parallel edges.
///
/// # Failure
///
/// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for
/// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the
/// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory
/// safety issue, so this function is safe.
fn dec(buf: &[u8]) -> Self;
/// Encode an arrow's key origin-first and target-first.
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>);
#[doc(hidden)]
/// Turn him into a raw edge.
fn raw(&self) -> Raw;
}
use crate::{Key, Result, Tag, Transaction}; union Raw {
multi: Multi,
basic: Basic,
}
pub fn insert<A>(tx: &Transaction<'_>, origin: Key, target: Key) -> Result<Key> impl Store {
/// Check whether there exists any arrow of type `A` that points from `origin` to `target`.
pub fn exists<A>(&self, origin: Key, target: Key) -> Result<bool>
where where
A: MultiArrow, A: Arrow,
{ {
let key = Key::gen(); op::exists::<A>(self, origin, target)
tx.quiver(A::TYPE).insert(origin, target, key)?; }
Ok(key) /// Get all arrows of type `A` that point at `target`.
pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::incoming::<A>(self, target).map_ok(A::from)
}
/// Get all arrows of type `A` that point away from `origin`.
pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::outgoing::<A>(self, origin).map_ok(A::from)
}
/// List all arrows between `a` and `b`, in either direction.
pub fn between<'a, A>(&'a self, a: Key, b: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow<Kind = Multi> + 'a,
{
op::between::<A>(self, a, b).map_ok(A::from)
}
}
impl Transaction<'_> {
/// Check whether there exists any arrow of type `A` that points from `origin` to `target`.
///
/// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges).
pub fn exists<A>(&self, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
op::exists::<A>(self, origin, target)
}
/// Get all arrows of type `A` that point at `target`.
pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::incoming::<A>(self, target).map_ok(A::from)
}
/// Get all arrows of type `A` that point away from `origin`.
pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::outgoing::<A>(self, origin).map_ok(A::from)
}
/// Create a new arrow of type `A`.
///
/// This operation supports both [`Multi`] and [`Basic`] arrows.
///
/// # Example
///
/// The following snippet creates an arrow between `origin` and `target`.
///
/// ```rust
/// # fn main () -> store::Result<()> {
/// use store::{Arrow, Key};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// # let schema = store::types::Schema::new().has::<MyArrow>();
/// # store::Store::test(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// db.run(|tx| {
/// tx.create(MyArrow { origin, target })
/// })?;
///
/// assert!(db.exists::<MyArrow>(origin, target)?);
/// # store::OK })
/// # }
/// ```
pub fn create<A>(&self, arrow: A) -> Result<()>
where
A: Arrow,
{
op::create::<A>(self, arrow.into())
}
/// Delete all edges of type `A` from `origin` to `target`.
///
/// It is not an error for this function not to delete anything.
pub fn delete_all<A>(&self, origin: Key, target: Key) -> Result<()>
where
A: Arrow,
{
op::delete_all::<A>(self, origin, target)
}
/// Delete a specific arrow.
pub fn delete_one<A>(&self, arrow: A) -> Result<()>
where
A: Arrow,
{
op::delete_one::<A>(self, arrow.into())
}
/// List all arrows between `a` and `b`, in either direction.
pub fn between<'a, A>(&'a self, a: Key, b: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow<Kind = Multi> + 'a,
{
op::between::<A>(self, a, b).map_ok(A::from)
}
}
impl Batch {
/// Create an arrow. See [`Transaction::create`].
pub fn create<A>(&mut self, arrow: A)
where
A: Arrow,
{
op::create::<A>(self, arrow.into())
.expect("no errors expected to occur during batch operation")
}
/// Delete a specific arrow.
pub fn delete_one<A>(&mut self, arrow: A)
where
A: Arrow,
{
op::delete_one::<A>(self, arrow.into())
.expect("no errors expected to occur during batch operation")
}
}
mod op {
//! Implementations of arrow operations.
use super::*;
use crate::{internal::*, types::MULTIEDGE_HEADERS, Key, Result, OK};
/// Check whether there exists at least one arrow of type `A` from `origin` to `target`.
pub fn exists<A>(cx: &impl Query, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
if A::Kind::IS_MULTI {
// In the case of a multi-edge, at least one result from the prefix scan
// indicates that there is at least one edge.
cx.open(A::SPEC.by_origin)
.scan(origin.fuse(target))
.next()
.transpose()
.map(|o| o.is_some())
} else {
cx.open(A::SPEC.by_origin).has(origin.fuse(target))
}
} }
pub fn list_incoming<'db, A>( /// List incoming arrows relative to `target`.
tx: &'db Transaction<'db>, pub fn incoming<'db, A>(
cx: &'db impl Query,
target: Key, target: Key,
) -> impl Iterator<Item = Result<(Key, Key)>> + 'db ) -> impl Iterator<Item = Result<A::Kind>> + 'db
where where
A: MultiArrow, A: Arrow,
A::Kind: 'db,
{ {
tx.quiver(A::TYPE).list_incoming(target) // In the `by_target` keyspace, for either kind of arrow the layout is such that the target is
// the prefix, so we pick that keyspace to more efficiently list all arrows that target the key.
cx.open(A::SPEC.by_target)
.scan(target)
.map_ok(|(mut k, _)| {
// Arrows from `by_target` are oriented target-first, while the decoder function requires
// that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix
// or the whole slice, swapping the two keys always gives us the ordering expected by the
// decoding function.
let (t, o) = k[..32].split_at_mut(16);
t.swap_with_slice(o);
A::Kind::dec(&k)
})
} }
pub trait MultiArrow { /// List outgoing arrows relative to `origin`.
const TYPE: Tag; pub fn outgoing<'db, A>(
cx: &'db impl Query,
origin: Key,
) -> impl Iterator<Item = Result<A::Kind>> + 'db
where
A: Arrow,
A::Kind: 'db,
{
cx.open(A::SPEC.by_origin)
.scan(origin)
.map_ok(|(ref k, _)| A::Kind::dec(k))
}
/// Get all arrows between the two endpoints (in either direction)
pub fn between<'db, A>(
cx: &'db impl Query,
origin: Key,
target: Key,
) -> impl Iterator<Item = Result<A::Kind>> + 'db
where
A: Arrow,
A::Kind: 'db,
{
let ks = cx.open(A::SPEC.by_origin);
ks.scan(origin.fuse(target))
.chain(ks.scan(target.fuse(origin)))
.map_ok(|(ref k, _)| A::Kind::dec(k))
}
/// Create a new arrow.
pub fn create<A>(cx: &impl Write, arrow: A::Kind) -> Result<()>
where
A: Arrow,
{
if A::Kind::IS_MULTI {
let Multi { identity, origin, target } = unsafe { arrow.raw().multi };
cx.open(MULTIEDGE_HEADERS)
.set(identity, origin.fuse(target))?;
}
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).set(by_origin, b"")?;
cx.open(A::SPEC.by_target).set(by_target, b"")?;
OK
}
/// Delete all arrows from `origin` to `target`.
///
/// TODO: Remove the query requirement (depends on range delete being available).
pub fn delete_all<A>(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()>
where
A: Arrow,
{
let by_origin = cx.open(A::SPEC.by_origin);
let by_target = cx.open(A::SPEC.by_target);
Ok(if A::Kind::IS_MULTI {
let headers = cx.open(MULTIEDGE_HEADERS);
// TODO: optimize this implementation using range deletes.
// Unfortunately, range deletes are not available in transactional backends.
for key in by_origin.scan(origin.fuse(target)).keys() {
let key = Multi::decode(key?.as_ref());
by_origin.del(key.encode())?;
by_target.del(key.swap().encode())?;
headers.del(key.identity)?;
}
} else {
by_origin.del(origin.fuse(target))?;
by_target.del(target.fuse(origin))?;
})
}
/// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist.
pub fn delete_one<A>(cx: &impl Write, arrow: A::Kind) -> Result<()>
where
A: Arrow,
{
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).del(by_origin)?;
cx.open(A::SPEC.by_target).del(by_target)?;
OK
} }
} }
/// A directed edge between two vertices. /// Types representing the different kinds of arrows.
pub trait Arrow: Encode + Decode { mod kinds {
const SPACE: (Space, Space); use super::ArrowKind;
} use crate::Key;
/// Which way an arrow is pointing when viewed from a particular vertex. impl ArrowKind for Multi {
pub enum Direction { const IS_MULTI: bool = true;
Incoming, fn dec(buf: &[u8]) -> Self {
Outgoing, Multi::decode(buf)
} }
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
/// The node this arrow points away from is the "author" of the node the arrow points to. (self.encode(), self.swap().encode())
#[derive(Encode, Decode)] }
pub struct AuthorOf; fn raw(&self) -> super::Raw {
super::Raw { multi: *self }
impl Arrow for AuthorOf { }
const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r")); }
}
impl ArrowKind for Basic {
/// The origin of this arrow has follow requested the target. const IS_MULTI: bool = false;
#[derive(Encode, Decode)] fn dec(buf: &[u8]) -> Self {
pub struct FollowRequested; Basic::decode(buf)
}
impl Arrow for FollowRequested { fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
const SPACE: (Space, Space) = (Space("pending-fr/l"), Space("pending-fr/r")); (self.encode(), self.reverse().encode())
} }
fn raw(&self) -> super::Raw {
/// The origin "follows" the target. super::Raw { basic: *self }
#[derive(Encode, Decode)] }
pub struct Follows; }
impl Arrow for Follows { /// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist
const SPACE: (Space, Space) = (Space("follows/l"), Space("follows/r")); /// between two vertices.
#[derive(Clone, Copy)]
pub struct Multi {
/// The node that this edge points away from.
pub origin: Key,
/// The node that this edge points towards.
pub target: Key,
/// The discriminator of this particular edge, which distinguishes it from all other edges that
/// connect `origin` and `target`, and indeed from every other edge or node in the graph.
pub identity: Key,
}
impl Multi {
/// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly
/// oriented, the results will be wrong; the arrow will be oriented *away* from the target and
/// *at* the origin, instead of the other way around.
///
/// # Orientation
///
/// In this context, *correctly oriented* means that it is laid out in *origin-first* order,
/// like this (where `o`, `t` and `i` represent bytes):
///
/// ```text
/// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii
/// |--------------| |--------------| |--------------|
/// origin target identity
/// ..16 16..32 32..
/// ```
///
/// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is
/// the prefix, followed by the origin, and then the identity. This is also called *target-first*
/// encoding in this documentation.
///
/// # Silent failure
///
/// There is no way to detect whether the ordering is correct from just the buffer, so the caller
/// must ensure that the order is correct. If you have a target-first encoded buffer, you can have
/// to swap the two keys before passing it into this function, or this function will give you an
/// edge that does not exist (since a multiedge can only point in one direction).
///
/// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`.
///
/// # Panics
///
/// This function panics if `buf` is not exactly 48 bytes long.
pub fn decode(buf: &[u8]) -> Multi {
Multi {
origin: Key::from_slice(&buf[..16]),
target: Key::from_slice(&buf[16..32]),
identity: Key::from_slice(&buf[32..]),
}
}
/// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation
/// of the difference between origin-first encoding and target-first encoding.
pub fn encode(self) -> [u8; 48] {
let mut key = [0; 48];
key[..16].copy_from_slice(&self.origin.0);
key[16..32].copy_from_slice(&self.target.0);
key[32..].copy_from_slice(&self.identity.0);
key
}
/// Swap the origin and target of this arrow, while leaving the identity the same.
pub(super) fn swap(self) -> Multi {
Multi {
origin: self.target,
target: self.origin,
..self
}
}
}
/// A normal directed edge. Duplicates are not allowed.
///
/// This kind of arrow is useful for modeling predicates and simple relationships.
#[derive(Clone, Copy)]
pub struct Basic {
pub origin: Key,
pub target: Key,
}
impl Basic {
/// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the
/// other direction).
pub fn reverse(self) -> Basic {
Basic {
origin: self.target,
target: self.origin,
}
}
/// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering.
pub fn encode(self) -> [u8; 32] {
self.origin.fuse(self.target)
}
/// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information
/// about key encoding.
///
/// # Panics
///
/// Panics if `buf` is not exactly 32 bytes long.
pub fn decode(buf: &[u8]) -> Basic {
let (origin, target) = Key::split(buf);
Basic { origin, target }
}
}
} }
/// Derive [`Arrow`] for a struct.
///
/// This will generate the required [`Into`] and [`From`] impls, as well as an [`Arrow`](trait@Arrow) impl and
/// a [`DataType`] impl with the namespaces derived from the name of the struct. The macro works on structs with
/// specific fields, or newtypes of any arrow kind.
///
/// # Attributes
///
/// The `origin`, `target` and `identity` attributes are used on fields of type [`Key`], and they are used
/// to map the arrow's type to an [`ArrowKind`]. The `#[origin]` annotation isn't needed if the struct contains
/// a field named `origin`. Ditto with `target` and `identity`.
///
/// If there is no `identity` defined, the `ArrowKind` will be [`Basic`]. If an `identity` is defined, the kind
/// will be [`Multi`].
///
/// # Examples
///
/// Generates a [`Basic`] arrow called `my-arrow`.
///
/// ```
/// use store::{Key, Arrow, types::Schema};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// // This will fail to compile if the type doesn't implement `Arrow` correctly
/// Schema::new().has::<MyArrow>();
/// ```
///
/// Newtypes of either arrow kind are supported.
///
/// ```
/// use store::{Key, arrow::{Basic, Multi, Arrow}};
///
/// /// The origin has requested to follow the target.
/// ///
/// /// Note: there may be more than one follow request between any two actors.
/// #[derive(Arrow)]
/// struct FollowRequest(Multi);
///
/// /// A relation between two actors meaning that the origin follows the target.
/// #[derive(Arrow)]
/// struct Follows(Basic);
///
/// /// Users can follow each other.
/// struct User(Key);
///
/// impl User {
/// /// Make `self` follow `other`.
/// pub fn follows(self, other: User) -> Follows {
/// Follows(Basic { origin: self.0, target: other.0 })
/// }
/// }
/// ```
///
/// Generates a [`Multi`] arrow called `my-multi-arrow`, mapping the multiarrow's discriminator to the struct's
/// `unique` field.
///
/// ```
/// use store::{Key, Arrow};
///
/// #[derive(Arrow)]
/// struct MyMultiArrow {
/// pub origin: Key,
/// pub target: Key,
/// #[identity]
/// pub unique: Key,
/// }
/// ```
///
/// The macro automatically adds `From` and `Into` implementations:
///
/// ```
/// use store::{Key, Arrow, arrow::Basic};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// let edge: Basic = MyArrow { origin, target }.into();
///
/// assert_eq!(origin, edge.origin);
/// assert_eq!(target, edge.target);
/// ```
pub use r#macro::Arrow;

300
lib/store/src/internal.rs Normal file
View file

@ -0,0 +1,300 @@
//! Provides a nice hashmap-esque interface for manipulating entries in the store's backend.
use std::sync::Arc;
use rocksdb::{BoundColumnFamily, IteratorMode};
pub use self::cx::{Context, Query, Write};
use crate::{util::IterExt as _, Error, Result};
/// An internal interface to a specific keyspace that exposes basic hashmap-esque operations
/// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a
/// [`Store`].
pub struct Keyspace<'db, C> {
pub(super) context: &'db C,
pub(super) cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db, C> Keyspace<'db, C>
where
C: Query,
{
/// Fetch a value from the keyspace.
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<impl AsRef<[u8]> + 'db>> {
self.context.get_pinned(&self.cf, key)
}
/// Test whether a key exists.
pub fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.get(key).map(|r| r.is_some())
}
/// Execute a prefix scan.
pub fn scan(
&self,
prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
let t = prefix.as_ref().to_vec();
self.context
.prefix_iterator(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map_err(Error::Internal)
}
/// List all pairs in the keyspace.
pub fn list(&self) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
self.context
.full_iterator(&self.cf, IteratorMode::Start)
.map_err(Error::Internal)
}
/// Execute a range scan
pub fn range<const N: usize>(
&self,
lower: [u8; N],
upper: [u8; N],
) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
// TODO: use a seek op to make this more efficient
self.context
.full_iterator(&self.cf, IteratorMode::Start)
.skip_while(move |r| match r {
Ok((ref k, _)) => k.as_ref() < &lower,
_ => false,
})
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.as_ref() < &upper,
_ => true,
})
.map_err(Error::Internal)
}
/// Join all the keys to their values in this keyspace.
///
/// This may be optimized compared to many random point lookups.
pub fn join(
&self,
keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.context
.multi_get(keys.into_iter().map(|x| (&self.cf, x)))
}
}
impl<C> Keyspace<'_, C>
where
C: Write,
{
/// Set the given `key` to the `value`, overwriting it if there was already a value there.
pub fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.context.put(&self.cf, key, val)
}
/// Drop the value if it exists.
pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.context.delete(&self.cf, key)
}
}
mod cx {
//! Contexts for doing reads, writes or both to the database.
//!
//! The traits in this module map abstract calls to their methods on the [rocksdb] objects.
use rocksdb::{
AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode,
};
use super::Keyspace;
use crate::{util::IterExt as _, Backend, Batch, Error, Result, Store, Transaction, OK};
/// A context for executing database operations.
pub trait Context {
/// Open the keyspace identified by `cf`.
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self>
where
Self: Sized;
}
/// A context in which one can read from the data store.
///
/// Specifically, this maps calls to either the transaction or the store's internals without us having
/// to implement methods for *both* transactions and the store.
pub trait Query: Context {
type Backend: DBAccess;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>>;
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
/// Optimized multi-point lookup.
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>>;
}
/// A context in which one can read from and modify the data store.
pub trait Write: Context {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>;
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()>;
}
impl Context for Store {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self> {
let name = cf.as_ref();
let Some(cf) = self.inner.cf_handle(name) else {
panic!("unregistered keyspace {name}! is it in the schema?")
};
Keyspace { context: &self, cf }
}
}
impl Query for Store {
type Backend = Backend;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>> {
self.inner.get_pinned_cf(cf, key).map_err(Error::Internal)
}
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Backend> {
self.inner.prefix_iterator_cf(cf, prefix)
}
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.inner
.multi_get_cf(keys)
.into_iter()
.map_err(Error::Internal)
.collect()
}
}
impl Context for Transaction<'_> {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self> {
let name = cf.as_ref();
let Some(cf) = self.store.inner.cf_handle(name) else {
panic!("unregistered keyspace {name}! is it in the schema?")
};
Keyspace { context: &self, cf }
}
}
impl<'db> Query for Transaction<'db> {
type Backend = rocksdb::Transaction<'db, Backend>;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>> {
self.inner.get_pinned_cf(cf, key).map_err(Error::Internal)
}
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.prefix_iterator_cf(cf, prefix)
}
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.inner
.multi_get_cf(keys)
.into_iter()
.map_err(Error::Internal)
.collect()
}
}
impl Write for Transaction<'_> {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> {
self.inner.delete_cf(cf, key).map_err(Error::Internal)
}
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
self.inner.put_cf(cf, key, val).map_err(Error::Internal)
}
}
impl Context for Batch {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self>
where
Self: Sized,
{
let name = cf.as_ref();
let Some(cf) = self.store.inner.cf_handle(name) else {
panic!("unregistered keyspace {name}! is it in the schema?")
};
Keyspace { context: &self, cf }
}
}
impl Write for Batch {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> {
self.inner.borrow_mut().delete_cf(cf, key);
OK
}
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
self.inner.borrow_mut().put_cf(cf, key, val);
OK
}
}
}

View file

@ -3,7 +3,7 @@ use std::fmt::{Debug, Display};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use ulid::Ulid; use ulid::Ulid;
use crate::{Alias, Error, Result, Transaction}; use crate::arrow::{ArrowKind, Basic, Multi};
/// A unique identifier for vertices in the database. /// A unique identifier for vertices in the database.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
@ -31,7 +31,7 @@ impl Key {
Key(key) Key(key)
} }
pub fn timestamp(self) -> DateTime<Utc> { pub fn timestamp(self) -> DateTime<Utc> {
let ms = Ulid::from_bytes(self.0).timestamp_ms(); let ms = self.to_ulid().timestamp_ms();
DateTime::from_timestamp_millis(ms as i64).unwrap() DateTime::from_timestamp_millis(ms as i64).unwrap()
} }
/// Join two keys together. /// Join two keys together.
@ -46,6 +46,14 @@ impl Key {
let head = Key::from_slice(&buf[16..]); let head = Key::from_slice(&buf[16..]);
(tail, head) (tail, head)
} }
pub(crate) fn range(ts: DateTime<Utc>) -> ([u8; 16], [u8; 16]) {
let min = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MIN).to_bytes();
let max = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MAX).to_bytes();
(min, max)
}
fn to_ulid(self) -> Ulid {
Ulid::from_bytes(self.0)
}
} }
impl AsRef<[u8]> for Key { impl AsRef<[u8]> for Key {
@ -53,46 +61,3 @@ impl AsRef<[u8]> for Key {
&self.0 &self.0
} }
} }
/// Anything that can be used to reference a vertex, both "normal" [keys](Key)
/// and [aliases](Alias).
///
/// In general, using a key directly is going to be more efficient than using
/// an alias, because it incurs less lookups.
pub trait Keylike: Sized {
/// Translate the thing to a [`Key`].
///
/// This function should return [`Error::Missing`] if the key cannot be located.
fn translate(self, tx: &Transaction<'_>) -> Result<Key>;
/// Translate, and check whether the key is actually registered.
///
/// This function should return [`Error::Undefined`] if the key does not *really*
/// exist. It should return [`Error::Missing`] if the key can't be found.
fn checked_translate(self, tx: &Transaction<'_>) -> Result<Key> {
let key = self.translate(tx)?;
if !tx.is_registered(key)? {
Err(Error::Undefined { key })
} else {
Ok(key)
}
}
}
impl Keylike for Key {
fn translate(self, _: &Transaction<'_>) -> Result<Key> {
Ok(self)
}
}
impl<A> Keylike for A
where
A: Alias,
{
fn translate(self, tx: &Transaction<'_>) -> Result<Key> {
tx.lookup_alias(self)
}
}
/// A type tag identifying a vertex.
#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
pub struct Tag(pub u8);

View file

@ -1,78 +1,113 @@
#![feature(iterator_try_collect)] #![feature(iterator_try_collect, associated_type_defaults)]
//! The data store abstractions used by the ActivityPuppy project. //! Data persistence for the ActivityPuppy social media server built on top of [rocksdb].
//! //!
//! Persistence in a puppy server is handled by this component, which implements a directed graph //! # Overview
//! inspired datastore on top of the [rocksdb] key-value store.
//! //!
//! The workflow for manipulating stuff in the store is to open a [`Store`], and then to call //! The design of the data store's abstractions is heavily inspired by graph theory. The idea is to encourage
//! its [`transaction`](Store::transaction) method. This method takes a function that, given //! composition and loose coupling by segmenting all data associated with a node into [mixins][Mixin], and
//! a [`Transaction`], returns a result with some value. The `Transaction` object contains some //! modeling relations and predicates between nodes as [arrows][Arrow]. In additions, the key identifying a
//! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it //! node can be [aliased][Alias] by a string newtype, which must be unique within the namespace of that alias.
//! back.
//! //!
//! This component is specialized to puppy's storage needs, and probably won't be much use unless //! The API is optimized for reducing boilerplate and legibility at the call site.
//! you're writing something that interfaces with puppy. //!
//! There are three interfaces to the store: the read-only [`Store`], the write-only [`Batch`] and the [`Transaction`],
//! which allows both reads and writes.
use std::{path::Path, sync::Arc}; use std::{cell::RefCell, path::Path, sync::Arc};
use derive_more::From; use derive_more::From;
use rocksdb::{MultiThreaded, Options, TransactionDBOptions}; use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction};
use types::Schema;
type Backend = rocksdb::TransactionDB<MultiThreaded>;
mod alias;
mod internal;
mod key; mod key;
mod transaction; mod mixin;
pub use key::{Key, Keylike, Tag};
pub use transaction::Transaction;
pub use {alias::Alias, arrow::Arrow, mixin::Mixin};
pub mod alias;
pub mod arrow;
pub mod mixin;
pub mod util;
/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
pub const OK: Result<()> = Ok(());
/// Master list of all column family names in use.
const SPACES: &[&'static str] = &[
"registry",
"username/l",
"username/r",
"follows/l",
"follows/r",
"profile",
"content",
"created-by/l",
"created-by/r",
"pending-fr/l",
"pending-fr/r",
"multi:id-map",
"multi:index/l",
"multi:index/r",
#[cfg(test)]
"test-arrow/l",
#[cfg(test)]
"test-arrow/r",
];
/// The handle to the data store.
///
/// This type can be cloned freely.
#[derive(Clone)]
pub struct Store {
inner: Arc<Backend>,
}
/// The name of the puppy data store inside the state directory. /// The name of the puppy data store inside the state directory.
const STORE_NAME: &str = "main-store"; const STORE_NAME: &str = "main-store";
/// Open a [`Store`]. Creates one if it doesn't exist yet at the state directory path.
pub fn open(state_dir: impl AsRef<Path>, schema: Schema) -> Result<Store> {
Store::open(state_dir, schema)
}
pub use {alias::Alias, arrow::Arrow, key::Key, mixin::Mixin};
pub mod arrow;
pub mod types;
pub mod util;
/// The main interface to the data persistence engine.
///
/// This type provides reading capabilities, but does not expose APIs for manipulating data in the store. For
/// that, you must [run][Store::run] a [`Transaction`] or [apply][Store::apply] a [`Batch`].
#[derive(Clone)]
pub struct Store {
// TODO: maybe switch to `OptimisticTransactionDB` because it has `batched_multi_get_cf`, which may be useful
// if we end up doing lots of point lookups. alternatively, maybe we don't need *transactions* altogether, and
// we can get away with write batches and snapshots. the main problem with transactions is that it doesn't let
// us do range deletes, which affects the efficiency of multiarrow deletion.
//
// a switch to write batches is feasible if we end up not doing reads and writes in the same transaction.
inner: Arc<Backend>,
}
/// Hosts APIs for manipulating the data store.
///
/// You can access these APIs from the body of the closure passed to [`Store::run`].
pub struct Transaction<'db> {
inner: rocksdb::Transaction<'db, Backend>,
store: &'db Store,
}
/// A set of writes that are to be executed atomically.
pub struct Batch {
inner: RefCell<WriteBatchWithTransaction<true>>,
store: Store,
}
impl Store { impl Store {
/// Open a data store in the given `state_dir`. /// Run a [transaction][Transaction].
/// ///
/// If the data store does not exist yet, it will be created. /// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not
pub fn open(state_dir: impl AsRef<Path>) -> Result<Store> { /// recorded. Changes made inside a transaction can be read from within that transaction before they are
/// committed.
///
/// If the closure passed to `run` returns an error, the transaction is rolled back, and otherwise the
/// changes are committed.
pub fn run<T, E>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T, E>) -> Result<T, E>
where
E: From<Error>,
{
let tx = Transaction {
inner: self.inner.transaction(),
store: &self,
};
let r = f(&tx);
if let Err(e) = if r.is_err() {
tx.inner.rollback()
} else {
tx.inner.commit()
} {
return Err(E::from(Error::Internal(e)));
}
r
}
/// Apply a batch of changes atomically.
pub fn apply(&self, batch: Batch) -> Result<()> {
self.inner.write(batch.inner.into_inner())?;
OK
}
/// Construct a [`Batch`].
pub fn batch(&self) -> Batch {
Batch {
inner: RefCell::new(WriteBatchWithTransaction::default()),
store: self.clone(),
}
}
/// Open the data store in `state_dir`, and create one if it doesn't exist yet.
pub fn open(state_dir: impl AsRef<Path>, schema: Schema) -> Result<Store> {
let mut db_opts = Options::default(); let mut db_opts = Options::default();
db_opts.create_if_missing(true); db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true); db_opts.create_missing_column_families(true);
@ -81,57 +116,24 @@ impl Store {
&db_opts, &db_opts,
&tx_opts, &tx_opts,
state_dir.as_ref().join(STORE_NAME), state_dir.as_ref().join(STORE_NAME),
SPACES, schema.0,
)?); )?);
Ok(Store { inner }) Ok(Store { inner })
} }
/// Construct a temporary store, for testing. This store gets erased after `f` is done. /// Delete the main data store in `state_dir` if it exists.
pub fn with_tmp<T, E>(f: impl FnOnce(Store) -> Result<T, E>) -> Result<T, E>
where
E: From<Error>,
{
let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir");
f(Store::open(tmp_dir)?)
}
/// Delete the whole store.
///
/// **This deletes all data in the store**. Do not run this unless you want to delete all the state of the instance.
pub fn nuke(state_dir: impl AsRef<Path>) -> Result<()> { pub fn nuke(state_dir: impl AsRef<Path>) -> Result<()> {
Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME)) Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME))
.map_err(Error::from) .map_err(Error::Internal)
} }
/// Get the value of mixin `M` for `key`. /// Open a store that lives until `f` returns, for testing.
pub fn lookup<M>(&self, key: impl Keylike) -> Result<(Key, M)> pub fn test<T>(schema: Schema, f: impl FnOnce(Store) -> T) -> T {
where let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir");
M: Mixin, f(Store::open(tmp_dir, schema).expect("failed to open temporary data store in {tmp_dir}"))
{
self.transaction(|tx| tx.lookup(key))
}
/// Get the key associated with a given [alias][Alias].
pub fn translate<A>(&self, s: impl ToString) -> Result<Key>
where
A: Alias,
{
self.transaction(|tx| tx.lookup_alias(A::from(s.to_string())))
}
/// Quickly test whether a particular [arrow][Arrow] exists.
pub fn exists<A>(&self, arrow: (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.transaction(|tx| tx.exists::<A>(arrow))
} }
} }
/// An isolated keyspace. /// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub const OK: Result<()> = Ok(());
pub struct Space(&'static str);
impl AsRef<str> for Space {
fn as_ref(&self) -> &str {
&self.0
}
}
/// Results from this component. /// Results from this component.
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -154,3 +156,5 @@ pub enum Error {
Encoding(bincode::error::EncodeError), Encoding(bincode::error::EncodeError),
Decoding(bincode::error::DecodeError), Decoding(bincode::error::DecodeError),
} }
type Backend = rocksdb::TransactionDB<rocksdb::MultiThreaded>;

View file

@ -1,35 +1,251 @@
//! Modules of information. use std::ops::RangeBounds;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use crate::Space; use super::{
types::{DataType, MixinSpec},
Batch, Store, Transaction,
};
use crate::{util::IterExt as _, Error, Key, Result};
/// A simple piece of data associated with a vertex. /// Mixins are the simplest pieces of data in the store.
pub trait Mixin: Encode + Decode { pub trait Mixin: DataType<Type = MixinSpec> + Encode + Decode {}
const SPACE: Space;
/// Derive a [`Mixin`] implementation.
///
/// In addition to deriving `Mixin`, you will need to derive or implement [`Encode`]
/// and [`Decode`].
pub use r#macro::Mixin;
impl Store {
/// Get the value!
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
where
M: Mixin,
{
op::get_mixin(self, node)
}
/// Check if `node` has a mixin `M`.
pub fn has_mixin<M>(&self, node: Key) -> Result<bool>
where
M: Mixin,
{
op::has_mixin::<M>(self, node)
}
/// Get all `M`s where the key's timestamp is within the `range`.
pub fn range<M>(
&self,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
op::get_range(self, range)
}
/// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results.
pub fn join_on<M>(
&self,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
op::join_on(self, iter)
}
} }
/// Information needed to render a social media profile. impl Transaction<'_> {
#[derive(Encode, Decode)] /// Apply an update function to the mixin `M` of `node`.
pub struct Profile { ///
pub post_count: usize, /// # Errors
pub account_name: String, ///
pub display_name: Option<String>, /// - [`Error::Missing`]: if `node` does not have a mixin of this type.
pub about_string: Option<String>, ///
pub about_fields: Vec<(String, String)>, /// [`Error::Missing`]: crate::Error::Missing
pub fn update<M>(&self, node: Key, update: impl FnOnce(M) -> M) -> Result<()>
where
M: Mixin,
{
op::update(self, node, update)
}
/// Get the mixin of the specified type associated with `node`.
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
where
M: Mixin,
{
op::get_mixin(self, node)
}
/// Add a mixin to `node`.
///
/// # Errors
///
/// - [`Error::Conflict`]: if `node` already has a mixin of type `M`.
///
/// [`Error::Conflict`]: crate::Error::Missing
pub fn add_mixin<M>(&self, node: Key, mixin: M) -> Result<()>
where
M: Mixin,
{
if op::has_mixin::<M>(self, node)? {
return Err(Error::Conflict);
} else {
op::add_mixin::<M>(self, node, mixin)
}
}
/// Check whether `node` has an `M` defined for it.
pub fn has_mixin<M>(&self, node: Key) -> Result<bool>
where
M: Mixin,
{
op::has_mixin::<M>(self, node)
}
/// Get all `M`s where the key's timestamp is within the `range`.
pub fn range<M>(
&self,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
op::get_range(self, range)
}
/// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results.
pub fn join_on<M, T>(
&self,
f: impl Fn(T) -> Key,
iter: impl IntoIterator<Item = Result<T>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
op::join_on(self, iter.into_iter().map_ok(f))
}
} }
impl Mixin for Profile { impl Batch {
const SPACE: Space = Space("profile"); /// Add a mixin to the `node`.
///
/// **Note**: unlike [`Transaction::add_mixin`], this will *not* return an error if the key already has a mixin
/// of this type. This *should* not cause inconsistency.
pub fn put_mixin<M>(&mut self, node: Key, mixin: M)
where
M: Mixin,
{
op::add_mixin(self, node, mixin).unwrap()
}
} }
/// Contents of a post. mod op {
#[derive(Encode, Decode)] use std::ops::{Bound, RangeBounds};
pub struct Content {
pub content: Option<String>,
pub summary: Option<String>,
}
impl Mixin for Content { use chrono::{DateTime, TimeDelta, Utc};
const SPACE: Space = Space("content"); use either::Either;
use super::Mixin;
use crate::{internal::*, util::IterExt as _, Error, Key, Result};
pub fn update<M>(
cx: &(impl Query + Write),
node: Key,
update: impl FnOnce(M) -> M,
) -> Result<()>
where
M: Mixin,
{
// TODO: implement in terms of a merge operator instead of separate query and write ops.
// this would let us remove the `Query` bound, which would in turn let us update from within
// a batch.
//
// See https://github.com/facebook/rocksdb/wiki/Merge-Operator
//
// It looks like rocksdb allows you to specify a merge operator per column family.[^1]
// This means we can construct our column families with a merge operator that knows how to encode and decode mixins.
//
// [^1]: https://github.com/facebook/rocksdb/blob/9d37408f9af15c7a1ae42f9b94d06b27d98a011a/include/rocksdb/options.h#L128
let tree = cx.open(M::SPEC.keyspace);
match tree.get(node.as_ref())? {
None => Err(Error::Missing),
Some(buf) => {
let new = decode(buf).map(update).and_then(encode)?;
tree.set(node, new)
}
}
}
pub fn get_mixin<M: Mixin>(cx: &impl Query, node: Key) -> Result<Option<M>> {
cx.open(M::SPEC.keyspace).get(node)?.map(decode).transpose()
}
pub fn add_mixin<M: Mixin>(cx: &impl Write, node: Key, mixin: M) -> Result<()> {
cx.open(M::SPEC.keyspace).set(node, encode(mixin)?)
}
pub fn has_mixin<M: Mixin>(cx: &impl Query, node: Key) -> Result<bool> {
cx.open(M::SPEC.keyspace).has(node)
}
pub fn get_range<M: Mixin>(
cx: &impl Query,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_ {
// TODO: Test this thoroughly
const MS: TimeDelta = TimeDelta::milliseconds(1);
let iter = match (range.start_bound(), range.end_bound()) {
(Bound::Unbounded, Bound::Unbounded) => Either::Left(cx.open(M::SPEC.keyspace).list()),
(min, max) => {
let lower = match min {
Bound::Unbounded => [u8::MIN; 16],
Bound::Included(inc) => Key::range(*inc).0,
Bound::Excluded(exc) => Key::range(*exc + MS).0,
};
let upper = match max {
Bound::Unbounded => [u8::MAX; 16],
Bound::Included(inc) => Key::range(*inc).1,
Bound::Excluded(exc) => Key::range(*exc - MS).1,
};
Either::Right(cx.open(M::SPEC.keyspace).range(lower, upper))
}
};
iter.bind_results(|(k, v)| {
let key = Key::from_slice(k.as_ref());
let val = decode(v)?;
Ok((key, val))
})
}
pub fn join_on<M>(
cx: &impl Query,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
let keys: Vec<Key> = iter.into_iter().try_collect()?;
cx.open(M::SPEC.keyspace)
.join(keys.iter())
.into_iter()
.zip(keys)
.map(|(opt, key)| {
let Some(buf) = opt? else {
return Ok((key, None));
};
let val = decode(buf)?;
Ok((key, Some(val)))
})
.try_collect()
}
pub(super) fn encode(data: impl bincode::Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
pub(super) fn decode<T>(data: impl AsRef<[u8]>) -> Result<T>
where
T: bincode::Decode,
{
bincode::decode_from_slice(data.as_ref(), bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}
} }

View file

@ -1,432 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use bincode::{Decode, Encode};
use rocksdb::{BoundColumnFamily, IteratorMode};
use crate::{
arrow::Direction, key::Tag, util::IterExt as _, Alias, Arrow, Backend, Error, Key, Keylike,
Mixin, Result, Store, OK, SPACES,
};
impl Store {
/// Initiate a transaction.
///
/// If the result is an error, the transaction is rolled back, and otherwise the transaction
/// is committed.
pub fn transaction<T>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T>) -> Result<T> {
// Load all the column family handles, because they can't be accessed through the
// `rocksdb::Transaction` struct, only the `TransactionDB`.
let spaces = SPACES
.into_iter()
.map(|name| (*name, self.inner.cf_handle(name).unwrap()))
.collect();
let tx = Transaction {
inner: self.inner.transaction(),
spaces,
};
let result = f(&tx);
if result.is_err() {
tx.inner.rollback()?;
} else {
tx.inner.commit()?;
}
result
}
/// Check whether a key exists in the registry,
pub fn is_registered(&self, key: Key) -> Result<bool> {
let cf = self
.inner
.cf_handle("registry")
.expect("failed to open registry");
self.inner
.get_pinned_cf(&cf, key)
.map(|opt| opt.is_some())
.map_err(Error::Internal)
}
}
/// A database transaction, in which either each action succeeds, or everything fails
/// together.
///
/// The transaction struct is the interface for quering and manipulating persisted content.
pub struct Transaction<'db> {
inner: rocksdb::Transaction<'db, Backend>,
spaces: HashMap<&'static str, Arc<BoundColumnFamily<'db>>>,
}
/// Methods for manipulating the registry.
///
/// Before you can manipulate a vertex, its needs to be registered.
impl Transaction<'_> {
/// Register a new vertex.
pub fn create_vertex(&self, key: Key, tag: Tag) -> Result<()> {
self.with("registry").set(key, [tag.0])
}
/// Delete a vertex from the registry.
pub fn delete_vertex(&self, key: Key) -> Result<()> {
// TODO: also make this delete all related data?
self.with("registry").del(key)
}
/// Check whether a vertex is registered in the database.
pub fn is_registered(&self, key: Key) -> Result<bool> {
self.with("registry").has(key)
}
}
/// Methods for manipulating mixins.
///
/// For each implementor of [`Mixin`], a vertex can have at most one record of that type
/// associated with it.
impl Transaction<'_> {
/// Query the store for a value associated with the vertex `key` identifies.
///
/// Using a [`Key`] is more efficient than using an alias.
pub fn lookup<M>(&self, key: impl Keylike) -> Result<(Key, M)>
where
M: Mixin,
{
// Checked translate isn't needed, we'll complain if we can't find the data.
let canonicalized_key = key.translate(&self)?;
let raw = self.with(M::SPACE).get(canonicalized_key)?;
let value = decode(raw.as_ref())?;
Ok((canonicalized_key, value))
}
/// Associate a new mixin value with the key.
///
/// # Errors
///
/// - `Error::Conflict` if a mixin of this type is already associated with the vertex
/// - `Error::Undefined` if `key` is not in the registry.
pub fn insert<M>(&self, key: impl Keylike, data: M) -> Result<()>
where
M: Mixin,
{
let key = key.checked_translate(&self)?;
let data = encode(data)?;
let ns = self.with(M::SPACE);
// Check for conflicts. Fail if the key already exists, otherwise set the key
// to the given value.
if ns.has(key)? {
Err(Error::Conflict)
} else {
ns.set(key, data)
}
}
/// Apply an update function to the mixin identified by the key.
///
/// # Errors
///
/// - `Error::Undefined` if the `key` is not registered
/// - `Error::Missing` if `key` does not exist in the keyspace associated with `M`
pub fn update<M>(&self, key: impl Keylike, f: impl FnOnce(Key, M) -> Result<M>) -> Result<()>
where
M: Mixin,
{
let key = key.checked_translate(self)?;
let (key, old) = self.lookup::<M>(key)?;
let new = f(key, old).and_then(encode)?;
self.with(M::SPACE).set(key, new)
}
/// Remove the mixin from the vertex `key` refers to.
///
/// Doesn't complain if the value does not exist in the expected keyspace.
pub fn remove<M>(&self, key: impl Keylike) -> Result<Option<M>>
where
M: Mixin,
{
// Checked translate isn't needed because we don't care if the key is bogus.
let canonical_key = key.translate(self)?;
let ns = self.with(M::SPACE);
match ns.pop(canonical_key) {
Ok(Some(val)) => decode(&val).map(Some),
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
/// List all key-value pairs for mixins of type `M`.
pub fn list<M>(&self) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
self.with(M::SPACE).list().bind_results(|(k, v)| {
let v = decode(v.as_ref())?;
let k = Key::from_slice(k.as_ref());
Ok((k, v))
})
}
}
/// Methods for interacting with [aliases][Alias], which are unique alternate keys.
impl Transaction<'_> {
/// Look up the key that the given alias maps to.
///
/// If the key was deleted, but the alias wasn't properly cleaned up,
pub fn lookup_alias<A>(&self, alias: A) -> Result<Key>
where
A: Alias,
{
let (l, _) = A::SPACE;
let raw = self.with(l).get(alias.to_string())?;
Ok(Key::from_slice(raw.as_ref()))
}
/// Create a new alias of type `A` for the given [`Key`].
///
/// If the alias already exists, this function returns `Conflict`.
pub fn insert_alias<A>(&self, key: Key, alias: A) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
let alias = alias.to_string();
if self.with(l).has(&alias)? {
return Err(Error::Conflict);
}
self.with(l).set(&alias, key)?;
self.with(r).set(key, &alias)?;
OK
}
/// Delete the alias of type `A` that points to `key`.
pub fn remove_alias<A>(&self, key: Key) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
// First, pop the reverse mapping, which will give us the encoded
// key for the normal mapping. If it doesn't exist, don't delete
// the normal mapping.
if let Some(alias) = self.with(r).pop(key)? {
self.with(l).pop(alias)?;
}
OK
}
}
impl Transaction<'_> {
/// Find an arrow of type `A` with the given `tail` and `head`.
pub fn lookup_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<Option<A>>
where
A: Arrow,
{
let (l, _) = A::SPACE;
match self.with(l).get(tail.fuse(head)) {
Ok(raw) => decode(raw.as_ref()).map(Some),
Err(Error::Missing) => Ok(None),
Err(err) => Err(err),
}
}
/// Create a new arrow of type `A` and associate the label with it.
///
/// # Errors
///
/// - `Error::Undefined` if either key is not registered
pub fn insert_arrow<A>(&self, (tail, head): (Key, Key), label: A) -> Result<()>
where
A: Arrow,
{
if !self.is_registered(tail)? {
return Err(Error::Undefined { key: tail });
}
if !self.is_registered(head)? {
return Err(Error::Undefined { key: head });
}
let (l, r) = A::SPACE;
let label = encode(label)?;
self.with(l).set(tail.fuse(head), &label)?;
self.with(r).set(head.fuse(tail), &label)?;
OK
}
/// Delete an arrow from the data store.
pub fn remove_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<()>
where
A: Arrow,
{
self.with(A::SPACE.0).del(tail.fuse(head))?;
self.with(A::SPACE.1).del(head.fuse(tail))?;
OK
}
/// Check whether an arrow exists.
pub fn exists<A>(&self, (tail, head): (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.with(A::SPACE.0).has(tail.fuse(head))
}
/// Get all arrows of type `A` "pointing at" `key`.
pub fn list_incoming<A>(&self, key: impl Keylike) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
self.list_arrows_where(Direction::Incoming, key)
}
/// Get all arrows of type `A` "pointing away from" `key`.
pub fn list_outgoing<A>(&self, key: impl Keylike) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
self.list_arrows_where(Direction::Outgoing, key)
}
/// Get all arrows of type `A`.
pub fn list_arrows<A>(&self) -> impl Iterator<Item = Result<(Key, A, Key)>> + '_
where
A: Arrow,
{
self.with(A::SPACE.0).list().bind_results(|(k, v)| {
let (tail, head) = Key::split(k.as_ref());
decode(v.as_ref()).map(|label| (tail, label, head))
})
}
/// Select arrows with the given direction relative to the given key.
fn list_arrows_where<A>(
&self,
direction: Direction,
key: impl Keylike,
) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
// Keys in space 0 are arranged with the tail at the start, and the ones in space 1
// are arranged with the head at the start. This allows us to efficiently prefix scan
// regardless of the direction, at the cost of increased space usage.
let space = match direction {
Direction::Outgoing => A::SPACE.0,
Direction::Incoming => A::SPACE.1,
};
let key = key.translate(&self).unwrap();
#[cfg(test)]
eprintln!("scanning {} using prefix {key}", space.0);
self.with(space).scan(key).bind_results(|(k, v)| {
// Because we're prefix scanning on the first half of the key, we only want to
// get the second here.
let (_this, other) = Key::split(k.as_ref());
#[cfg(test)]
eprintln!(" found {_this}:{other}");
decode(v.as_ref()).map(|label| (other, label))
})
}
pub(crate) fn quiver(&self, tag: Tag) -> Quiver<'_> {
Quiver { tag, tx: &self }
}
}
impl Transaction<'_> {
/// Use a keyspace.
fn with(&self, name: impl AsRef<str>) -> Keyspace<'_> {
Keyspace {
cf: self.spaces[name.as_ref()].clone(),
tx: &self,
}
}
}
/// Provides the basic API for a keyspace/column family.
struct Keyspace<'db> {
tx: &'db Transaction<'db>,
cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db> Keyspace<'db> {
/// Retrieve a value from the database. Returns `Missing` if the key does not exist.
fn get(&self, key: impl AsRef<[u8]>) -> Result<impl AsRef<[u8]> + 'db> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.and_then(|opt| opt.ok_or(Error::Missing))
}
/// Set the value at `key` to `val`.
fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.tx
.inner
.put_cf(&self.cf, key, val)
.map_err(Error::Internal)
}
/// Delete the key-value pair identified by `key`.
fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.tx.inner.delete_cf(&self.cf, &key)?;
OK
}
/// Remove the key and associated value from the keyspace, and return its previous value.
fn pop(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
let old = self.tx.inner.get_for_update_cf(&self.cf, &key, true)?;
self.del(key)?;
Ok(old)
}
/// Check whether the key exists in the keyspace.
fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.map(|opt| opt.is_some())
}
/// Execute a prefix scan.
fn scan(
&self,
prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
let t = prefix.as_ref().to_vec();
self.tx
.inner
.prefix_iterator_cf(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map_err(Error::Internal)
}
/// Show all items in the entire keyspace.
fn list(
&self,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
self.tx
.inner
.full_iterator_cf(&self.cf, IteratorMode::Start)
.map_err(Error::Internal)
}
}
/// The quiver allows one to manipulate all parallel edges tagged with a particular type.
pub struct Quiver<'db> {
tx: &'db Transaction<'db>,
tag: Tag,
}
impl<'db> Quiver<'db> {
pub fn insert(&self, origin: Key, target: Key, identity: Key) -> Result<()> {
let fused = origin.fuse(target);
self.tx.with("multi:id-map").set(identity, fused)?;
let mut triple = [0; 48];
triple[..32].copy_from_slice(&fused);
triple[32..].copy_from_slice(identity.as_ref());
self.tx.with("multi:index/l").set(triple, b"")?;
triple[..32].rotate_left(16);
self.tx.with("multi:index/r").set(triple, b"")?;
OK
}
pub fn list_incoming(&self, target: Key) -> impl Iterator<Item = Result<(Key, Key)>> + 'db {
self.tx
.with("multi:index/r")
.scan(target)
.map_ok(|(k, _)| Key::split(&k.as_ref()[16..]))
}
}
fn encode(data: impl Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
fn decode<T>(data: &[u8]) -> Result<T>
where
T: Decode,
{
bincode::decode_from_slice(data, bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}
#[cfg(test)]
mod tests;

View file

@ -1,256 +0,0 @@
use super::*;
use crate::Space;
#[derive(Encode, Decode)]
struct TestArrow;
impl Arrow for TestArrow {
const SPACE: (Space, Space) = (Space("test-arrow/l"), Space("test-arrow/r"));
}
const TEST_TAG: Tag = Tag(69);
macro_rules! keygen {
{ $($name:ident)* } => {
$(
let $name = Key::gen();
eprintln!(concat!(stringify!($name), "={}"), $name);
)*
}
}
fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) -> Result<()> {
Store::with_tmp(|db| {
// Run these tests 128 times because misuse of prefix iterator may cause weird,
// obscure bugs :3
//
// Also, because we don't wipe the store between test runs, we have more chances
// to discover weird bugs that we wouldn't catch if there was only a single run.
Ok(for n in 0..128 {
eprintln!("--- run {n} ---");
db.transaction(|tx| {
keygen!(target origin);
tx.create_vertex(target, TEST_TAG)?;
tx.create_vertex(origin, TEST_TAG)?;
tx.insert_arrow((origin, target), TestArrow)?;
let l: Vec<String> = tx
.with("test-arrow/l")
.list()
.map_ok(|(k, _)| Key::split(k.as_ref()))
.map_ok(|(a, b)| format!("({a}, {b})"))
.try_collect()?;
eprintln!("test-arrow/l = {l:#?}");
let r: Vec<String> = tx
.with("test-arrow/r")
.list()
.map_ok(|(k, _)| Key::split(k.as_ref()))
.map_ok(|(a, b)| format!("({a}, {b})"))
.try_collect()?;
eprintln!("test-arrow/r = {r:#?}");
f(origin, target, &tx, n)
})?;
eprintln!("--- end run {n} ---");
})
})
}
#[test]
fn target_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let ti: Vec<_> = tx.list_incoming::<TestArrow>(target).keys().try_collect()?;
eprintln!("target.incoming = {ti:#?}");
assert!(ti.contains(&origin), "origin ∈ target.incoming");
assert!(!ti.contains(&target), "target ∉ target.incoming");
OK
})
}
#[test]
fn target_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let to: Vec<_> = tx.list_outgoing::<TestArrow>(target).keys().try_collect()?;
eprintln!("target.outgoing = {to:#?}");
assert!(!to.contains(&target), "target ∉ target.outgoing");
assert!(!to.contains(&origin), "origin ∉ target.outgoing");
OK
})
}
#[test]
fn origin_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oi: Vec<_> = tx.list_incoming::<TestArrow>(origin).keys().try_collect()?;
eprintln!("origin.incoming = {oi:#?}");
assert!(!oi.contains(&origin), "origin ∉ origin.incoming");
assert!(!oi.contains(&target), "target ∉ origin.incoming");
OK
})
}
#[test]
fn origin_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(origin).keys().try_collect()?;
eprintln!("origin.outgoing = {oo:#?}");
assert!(oo.contains(&target), "target ∈ origin.outgoing");
assert!(!oo.contains(&origin), "origin ∉ origin.outgoing");
OK
})
}
#[test]
fn fanout() -> Result<()> {
let targets: [Key; 128] = std::array::from_fn(|_| Key::gen());
let origin = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.create_vertex(origin, TEST_TAG)?;
for t in targets {
tx.create_vertex(t, TEST_TAG)?;
tx.insert_arrow((origin, t), TestArrow)?;
}
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(origin).keys().try_collect()?;
for t in targets {
assert!(oo.contains(&t), "∀ t ∈ targets: t ∈ origin.outgoing");
let ti: Vec<_> = tx.list_incoming::<TestArrow>(t).keys().try_collect()?;
assert!(
ti == vec! {origin},
"∀ t ∈ targets: t.incoming = {{origin}}"
);
}
OK
})
})
}
#[test]
fn fanin() -> Result<()> {
let origins: [Key; 128] = std::array::from_fn(|_| Key::gen());
let target = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.create_vertex(target, TEST_TAG)?;
for o in origins {
tx.create_vertex(o, TEST_TAG)?;
tx.insert_arrow((o, target), TestArrow)?;
}
let ti: Vec<_> = tx.list_incoming::<TestArrow>(target).keys().try_collect()?;
for o in origins {
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(o).keys().try_collect()?;
assert!(ti.contains(&o), "∀ o ∈ origins: o ∈ target.incoming");
assert!(
oo == vec! {target},
"∀ o ∈ origins: o.outgoing = {{target}}"
);
}
OK
})
})
}
#[test]
fn distinct_many_to_many() -> Result<()> {
let origins: [Key; 32] = std::array::from_fn(|_| Key::gen());
let targets: [Key; 32] = std::array::from_fn(|_| Key::gen());
Store::with_tmp(|db| {
db.transaction(|tx| {
for t in targets {
tx.create_vertex(t, TEST_TAG)?;
}
for o in origins {
tx.create_vertex(o, TEST_TAG)?;
for t in targets {
tx.insert_arrow((o, t), TestArrow)?;
}
}
let ti: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_incoming::<TestArrow>(t)
.keys()
.try_collect()
.map(|v: Vec<_>| (t, v))
})
.collect::<Result<_>>()?;
// For each origin point, there must be a target that has it as "incoming".
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().any(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∃ t ∈ targets: o ∈ t.incoming"
);
// Each target has each origin as incoming.
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().all(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∀ t ∈ targets: o ∈ t.incoming"
);
let to: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_outgoing::<TestArrow>(t)
.keys()
.try_collect()
.map(|v: Vec<_>| (t, v))
})
.collect::<Result<_>>()?;
// Our arrows point only from origins to targets, and there's a bug if there
// exists a target such that its outgoing set is non-empty.
assert!(
!targets.into_iter().any(|t| !to[&t].is_empty()),
"∄ t ∈ targets: t.outgoing ≠ ∅"
);
let oo: HashMap<Key, Vec<Key>> = origins
.into_iter()
.map(|o| {
tx.list_outgoing::<TestArrow>(o)
.keys()
.try_collect()
.map(|v: Vec<_>| (o, v))
})
.collect::<Result<_>>()?;
// Each origin has each target as outgoing.
assert!(
origins
.into_iter()
.all(|o| targets.into_iter().all(|t| oo[&o].contains(&t))),
"∀ o ∈ origins: ∀ t ∈ targets: t ∈ o.outgoing"
);
OK
})
})
}

178
lib/store/src/types.rs Normal file
View file

@ -0,0 +1,178 @@
//! Defining a [`Schema`].
//!
//! There is a lot of complicated machinery here to make it so that you have to write very little code to
//! define new types. Basically, if you want to define a thing to store, you need to implement the trait
//! for it (e.g. [`Arrow`]), and also implement [`DataType`], where you create a specification describing which
//! namespaces store records of that type.
//!
//! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able
//! to operate on the types.
//!
//! [`Arrow`]: super::Arrow
use std::collections::HashSet;
use derive_more::Display;
/// The namespace where all vertices must be registered.
pub(crate) const NODE_HEADERS: Keyspace = Keyspace("header:node");
/// The namespace where multiedge identities are mapped to endpoints.
pub(crate) const MULTIEDGE_HEADERS: Keyspace = Keyspace("header:multiedge");
/// A specification of all user-defined namespaces.
///
/// # Example
///
/// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then
/// testing whether it exists. If the appropriate keyspaces are not known to the store, this will panic.
///
/// ```rust
/// use store::{ arrow::Arrow, types::Schema, Store, Key, OK };
///
/// // Each kind of value has a derive macro.
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// fn main () -> store::Result<()> {
/// // Here, we make sure that the namespaces used for `MyArrow` are known.
/// let schema = Schema::new()
/// .has::<MyArrow>();
///
/// let result = Store::test(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// let mut changes = db.batch();
/// changes.create(MyArrow { origin, target });
/// db.apply(changes)?;
///
/// db.exists::<MyArrow>(origin, target)
/// })?;
///
/// assert!(result);
/// OK
/// }
/// ```
///
/// [basic arrow]: crate::arrow::Basic
pub struct Schema(pub(crate) HashSet<Keyspace>);
impl Schema {
/// Construct a new empty schema.
pub fn new() -> Schema {
Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS]))
}
/// Add the component to the schema.
pub fn has<C>(mut self) -> Schema
where
C: DataType,
{
self.add(C::SPEC);
self
}
/// Add a spec to the schema by mutable reference.
pub fn add(&mut self, spec: impl TypeSpec) -> &mut Schema {
spec.register(&mut self.0);
self
}
}
/// The name of a keyspace.
///
/// Specifically, this is the name of a RocksDB column family.
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
pub struct Keyspace(pub &'static str);
impl AsRef<str> for Keyspace {
fn as_ref(&self) -> &str {
self.0
}
}
/// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a
/// [mixin](MixinSpec).
///
/// All namespaces must be unique, and added to the [`Schema`].
pub trait DataType {
type Type: TypeSpec;
const SPEC: Self::Type;
}
/// The specification for an [`Arrow`](crate::Arrow).
///
/// The listed namespaces must be unique among all other namespaces.
#[derive(Clone, Copy)]
pub struct ArrowSpec {
/// The keyspace where edge keys are ordered `(origin, target)`.
pub by_origin: Keyspace,
/// The keyspace where edge keys are ordered `(target, origin)`.
pub by_target: Keyspace,
}
/// A specification for the namespaces needed to store an [`Alias`][crate::Alias].
#[derive(Clone, Copy)]
pub struct AliasSpec {
/// The alias -> key mapping table.
pub keyspace: Keyspace,
/// The key -> alias mapping table.
pub reversed: Keyspace,
}
/// Where do we store a mixin?
#[derive(Clone, Copy)]
pub struct MixinSpec {
/// The key -> mixin mapping table.
pub keyspace: Keyspace,
}
/// Describes how to add a [`DataType`] to a [`Schema`].
pub trait TypeSpec {
/// Register the namespaces.
fn register(&self, set: &mut HashSet<Keyspace>);
}
// TODO: better error messages.
impl TypeSpec for ArrowSpec {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.by_origin) {
panic! {
"Duplicate found while inserting Arrow::BY_ORIGIN: {}",
self.by_origin
}
}
if !set.insert(self.by_target) {
panic! {
"Duplicate found while inserting Arrow::BY_TARGET: {}",
self.by_target
}
}
}
}
impl TypeSpec for AliasSpec {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Alias::KEYSPACE: {}",
self.keyspace
}
}
if !set.insert(self.reversed) {
panic! {
"Duplicate found while inserting Alias::REVERSED: {}",
self.reversed
}
}
}
}
impl TypeSpec for MixinSpec {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Mixin::KEYSPACE: {}",
self.keyspace
}
}
}
}

View file

@ -46,6 +46,28 @@ pub trait IterExt: Iterator + Sized {
{ {
self.next().ok_or(e)? self.next().ok_or(e)?
} }
/// `filter_map` meets `and_then`.
fn filter_bind_results<'a, I, O, E>(
self,
mut f: impl FnMut(I) -> Result<Option<O>, E> + 'a,
) -> impl Iterator<Item = Result<O, E>> + 'a
where
Self: Iterator<Item = Result<I, E>> + 'a,
{
self.filter_map(move |r| r.and_then(|x| f(x)).transpose())
}
} }
impl<I> IterExt for I where I: Iterator {} impl<I> IterExt for I where I: Iterator {}
/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next
/// argument is the second tuple element.
pub fn key<K, V>(key: K) -> impl FnOnce(V) -> (K, V) {
move |val| (key, val)
}
/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next
/// argument is the second tuple element.
pub fn val<K, V>(val: V) -> impl FnOnce(K) -> (K, V) {
move |key| (key, val)
}

View file

@ -2,3 +2,5 @@ unstable_features = true
overflow_delimited_expr = true overflow_delimited_expr = true
group_imports = "StdExternalCrate" group_imports = "StdExternalCrate"
use_field_init_shorthand = true use_field_init_shorthand = true
reorder_modules = false
struct_lit_width = 30