From 29f90ad918781fd59edeab95ea7d143ddf3be427 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Tue, 23 Apr 2024 00:52:39 +0200 Subject: [PATCH] Store api overhaul --- Cargo.lock | 14 + Cargo.toml | 1 + bin/pupctl/src/main.rs | 51 ++- lib/macro/Cargo.toml | 13 + lib/macro/src/arrow.rs | 142 +++++++ lib/macro/src/lib.rs | 73 ++++ lib/puppy/Cargo.toml | 2 + lib/puppy/src/lib.rs | 238 +++++++----- lib/store/Cargo.toml | 2 + lib/store/src/alias.rs | 120 +++++- lib/store/src/arrow.rs | 600 ++++++++++++++++++++++++++--- lib/store/src/internal.rs | 297 ++++++++++++++ lib/store/src/key.rs | 55 +-- lib/store/src/lib.rs | 210 +++++----- lib/store/src/mixin.rs | 261 +++++++++++-- lib/store/src/transaction.rs | 432 --------------------- lib/store/src/transaction/tests.rs | 256 ------------ lib/store/src/types.rs | 178 +++++++++ lib/store/src/util.rs | 22 ++ 19 files changed, 1918 insertions(+), 1049 deletions(-) create mode 100644 lib/macro/Cargo.toml create mode 100644 lib/macro/src/arrow.rs create mode 100644 lib/macro/src/lib.rs create mode 100644 lib/store/src/internal.rs delete mode 100644 lib/store/src/transaction.rs delete mode 100644 lib/store/src/transaction/tests.rs create mode 100644 lib/store/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index e145567..89d6af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,6 +885,16 @@ dependencies = [ "libc", ] +[[package]] +name = "macro" +version = "0.0.0" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1130,6 +1140,8 @@ dependencies = [ name = "puppy" version = "0.0.0" dependencies = [ + "bincode", + "chrono", "fetch", "store", ] @@ -1490,6 +1502,8 @@ dependencies = [ "bincode", "chrono", "derive_more", + "either", + "macro", "rocksdb", "tempfile", "ulid", diff --git a/Cargo.toml b/Cargo.toml index eb8deed..3e1c446 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "lib/puppy", "lib/store", "lib/fetch", + "lib/macro", "bin/server", "bin/pupctl", ] diff --git a/bin/pupctl/src/main.rs b/bin/pupctl/src/main.rs index d97dc1a..95c19c1 100644 --- a/bin/pupctl/src/main.rs +++ b/bin/pupctl/src/main.rs @@ -1,31 +1,26 @@ use puppy::{ - store::{ - self, - alias::Username, - arrow::{FollowRequested, Follows}, - mixin::Profile, - Error, - }, + model::{schema, Bite, FollowRequest, Follows, Profile, Username}, + store::{self, Error}, tl::Post, - Bite, Key, Store, + Key, Store, }; fn main() -> store::Result<()> { // Store::nuke(".state")?; - let db = Store::open(".state")?; + let db = Store::open(".state", schema())?; println!("creating actors"); let riley = get_or_create_actor(&db, "riley")?; let linen = get_or_create_actor(&db, "linen")?; - if false { + if true { println!("creating posts"); puppy::create_post(&db, riley, "@linen <3")?; puppy::create_post(&db, linen, "@riley <3")?; } - if false { + if true { println!("making riley follow linen"); - if !db.exists::((riley, linen))? { + if !db.exists::(riley, linen)? { println!("follow relation does not exist yet"); - if !db.exists::((riley, linen))? { + if !db.exists::(riley, linen)? { println!("no pending follow request; creating"); puppy::fr::create(&db, riley, linen)?; } else { @@ -36,44 +31,46 @@ fn main() -> store::Result<()> { println!("riley already follows linen"); } } - println!("Posts on the instance:"); + println!("\nPosts on the instance:"); for Post { id, content, author, } in puppy::tl::fetch_all(&db)? { - let (_, Profile { account_name, .. }) = db.lookup(author)?; + let Profile { account_name, .. } = db.get_mixin(author)?.unwrap(); let content = content.content.unwrap(); println!("- {id} by @{account_name} ({author}):\n{content}",) } - println!("Linen's followers:"); + println!("\nLinen's followers:"); for id in puppy::fr::followers_of(&db, linen)? { - let (_, Profile { account_name, .. }) = db.lookup(id)?; + let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); println!("- @{account_name} ({id})"); } - println!("Riley's following:"); + println!("\nRiley's following:"); for id in puppy::fr::following_of(&db, riley)? { - let (_, Profile { account_name, .. }) = db.lookup(id)?; + let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); println!("- @{account_name} ({id})"); } - println!("Biting riley"); - puppy::bite_actor(&db, linen, riley).unwrap(); - for Bite { id, biter, .. } in puppy::bites_on(&db, riley).unwrap() { - let (_, Profile { account_name, .. }) = db.lookup(biter).unwrap(); - println!("riley was bitten by @{account_name} at {}", id.timestamp()); + if false { + println!("Biting riley"); + puppy::bite_actor(&db, linen, riley).unwrap(); + for Bite { id, biter, .. } in puppy::bites_on(&db, riley).unwrap() { + let Profile { account_name, .. } = db.get_mixin(biter)?.unwrap(); + println!("riley was bitten by @{account_name} at {}", id.timestamp()); + } } store::OK } fn get_or_create_actor(db: &Store, username: &str) -> Result { - let user = db.translate::(username); + let user = db.lookup(Username(username.to_string())); match user { - Ok(key) => { + Ok(Some(key)) => { println!("found '{username}' ({key})"); Ok(key) } - Err(Error::Missing) => { + Ok(None) => { println!("'{username}' doesn't exist yet, creating"); let r = puppy::create_actor(&db, username); if let Ok(ref key) = r { diff --git a/lib/macro/Cargo.toml b/lib/macro/Cargo.toml new file mode 100644 index 0000000..35fdf09 --- /dev/null +++ b/lib/macro/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "macro" +edition = "2021" + +[lib] +path = "src/lib.rs" +proc-macro = true + +[dependencies] +syn = { version = '2', features = ['full'] } +quote = '*' +proc-macro2 = '*' +heck = '*' diff --git a/lib/macro/src/arrow.rs b/lib/macro/src/arrow.rs new file mode 100644 index 0000000..34a8aad --- /dev/null +++ b/lib/macro/src/arrow.rs @@ -0,0 +1,142 @@ +use heck::AsKebabCase; +use proc_macro::TokenStream; +use quote::{quote, ToTokens}; +use syn::{parse_macro_input, Data, DeriveInput, Field, Ident}; + +pub fn arrow(item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as DeriveInput); + let Data::Struct(structure) = input.data else { + panic!("Only structs are supported as arrows") + }; + match structure.fields { + syn::Fields::Named(fields) => from_named(&input.ident, fields), + syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => { + let first = f.unnamed.first().unwrap(); + from_newtype(&input.ident, first) + } + _ => panic!( + "Only newtype structs and structs with named fields can have a derived arrow impl" + ), + } +} + +fn from_named(name: &Ident, fields: syn::FieldsNamed) -> TokenStream { + let (origin, target, identity) = extract_idents(fields); + match identity { + Some(id) => make_multi_arrow(name, origin, target, id), + None => make_basic_arrow(name, origin, target), + } +} + +fn make_basic_arrow(name: &Ident, origin: Ident, target: Ident) -> TokenStream { + let spec = gen_spec(name); + TokenStream::from(quote! { + #spec + impl store::arrow::Arrow for #name {} + impl From for #name { + fn from(v: store::arrow::Basic) -> #name { + #name { + #origin: v.origin, + #target: v.target, + } + } + } + impl From<#name> for store::arrow::Basic { + fn from(v: #name) -> store::arrow::Basic { + store::arrow::Basic { + origin: v.#origin, + target: v.#target, + } + } + } + }) +} + +fn make_multi_arrow(name: &Ident, origin: Ident, target: Ident, id: Ident) -> TokenStream { + let spec = gen_spec(name); + TokenStream::from(quote! { + #spec + impl store::arrow::Arrow for #name { + type Kind = store::arrow::Multi; + } + impl From for #name { + fn from(v: store::arrow::Multi) -> #name { + #name { + #id: v.identity, + #origin: v.origin, + #target: v.target, + } + } + } + impl From<#name> for store::arrow::Multi { + fn from(v: #name) -> store::arrow::Multi { + store::arrow::Multi { + identity: v.#id, + origin: v.#origin, + target: v.#target, + } + } + } + }) +} + +fn extract_idents(fields: syn::FieldsNamed) -> (Ident, Ident, Option) { + let origin = extract_ident("origin", &fields).unwrap(); + let target = extract_ident("target", &fields).unwrap(); + let id = extract_ident("identity", &fields); + (origin, target, id) +} + +fn extract_ident(name: &str, fields: &syn::FieldsNamed) -> Option { + // Prefer marked fields and default to correctly named fields. + fields + .named + .iter() + .find(|field| { + field + .attrs + .iter() + .filter_map(|attr| attr.meta.path().get_ident()) + .any(|id| id == name) + }) + .and_then(|f| f.ident.clone()) + .or_else(|| { + fields + .named + .iter() + .filter_map(|f| f.ident.clone()) + .find(|id| id == name) + }) +} + +fn gen_spec(name: &Ident) -> impl ToTokens { + let prefix = AsKebabCase(name.to_string()); + let by_origin = format!("{prefix}/by-origin"); + let by_target = format!("{prefix}/by-target"); + quote! { + impl store::types::Value for #name { + type Type = store::types::ArrowSpec; + const SPEC: Self::Type = store::types::ArrowSpec { + by_origin: store::types::Keyspace(#by_origin), + by_target: store::types::Keyspace(#by_target), + }; + } + } +} + +fn from_newtype(name: &Ident, field: &Field) -> TokenStream { + let spec = gen_spec(name); + let typ = &field.ty; + TokenStream::from(quote! { + #spec + impl store::arrow::Arrow for #name { + type Kind = #typ; + } + impl From<#typ> for #name { + fn from(v: #typ) -> #name { #name(v) } + } + impl From<#name> for #typ { + fn from(v: #name) -> #typ { v.0 } + } + }) +} diff --git a/lib/macro/src/lib.rs b/lib/macro/src/lib.rs new file mode 100644 index 0000000..6bf0643 --- /dev/null +++ b/lib/macro/src/lib.rs @@ -0,0 +1,73 @@ +use proc_macro::TokenStream; + +mod arrow; + +#[proc_macro_derive(Arrow, attributes(origin, target, identity))] +pub fn arrow(item: TokenStream) -> TokenStream { + arrow::arrow(item) +} + +#[proc_macro_derive(Alias)] +pub fn alias(item: TokenStream) -> TokenStream { + let input = syn::parse_macro_input!(item as syn::DeriveInput); + let syn::Data::Struct(structure) = input.data else { + panic!("Only structs are supported as aliases") + }; + match structure.fields { + syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => { + let first = f.unnamed.first().unwrap(); + make_alias_impl(&input.ident, first) + } + _ => panic!("Only string newtype structs are allowed as aliases"), + } +} + +fn make_alias_impl(name: &syn::Ident, field: &syn::Field) -> TokenStream { + let typ = &field.ty; + let prefix = heck::AsKebabCase(name.to_string()); + let keyspace = format!("{prefix}/keyspace"); + let reversed = format!("{prefix}/reversed"); + let spec = quote::quote! { + impl store::types::Value for #name { + type Type = store::types::AliasSpec; + const SPEC: Self::Type = store::types::AliasSpec { + keyspace: store::types::Keyspace(#keyspace), + reversed: store::types::Keyspace(#reversed), + }; + } + }; + + TokenStream::from(quote::quote! { + #spec + impl store::Alias for #name {} + impl AsRef for #name { + fn as_ref(&self) -> &str { self.0.as_ref() } + } + impl From<#typ> for #name { + fn from(v: #typ) -> #name { #name(v) } + } + }) +} + +#[proc_macro_derive(Mixin)] +pub fn mixin(item: TokenStream) -> TokenStream { + let input = syn::parse_macro_input!(item as syn::DeriveInput); + + let name = input.ident; + let prefix = heck::AsKebabCase(name.to_string()); + let keyspace = format!("{prefix}/main"); + + let spec = quote::quote! { + impl store::types::Value for #name { + type Type = store::types::MixinSpec; + const SPEC: Self::Type = store::types::MixinSpec { + keyspace: store::types::Keyspace(#keyspace), + }; + } + }; + + TokenStream::from(quote::quote! { + #spec + impl store::Mixin for #name {} + }) +} diff --git a/lib/puppy/Cargo.toml b/lib/puppy/Cargo.toml index 613c157..da89aae 100644 --- a/lib/puppy/Cargo.toml +++ b/lib/puppy/Cargo.toml @@ -8,3 +8,5 @@ path = "src/lib.rs" [dependencies] store = { path = "../store" } fetch = { path = "../fetch" } +bincode = "2.0.0-rc.3" +chrono = "*" diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index 3075cd8..e5689e0 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -1,46 +1,104 @@ -#![feature(iterator_try_collect)] +#![feature(iterator_try_collect, try_blocks)] +use model::{AuthorOf, Bite, Content, Profile, Username}; +use store::util::IterExt as _; pub use store::{self, Key, Store}; -use store::{ - alias::Username, - arrow::{self, multi::MultiArrow, AuthorOf}, - mixin::{Content, Profile}, - util::IterExt, - Keylike, Tag, -}; -mod tags { - //! Type tags for vertices. +pub mod model { + use bincode::{Decode, Encode}; + use store::{types::Schema, Alias, Arrow, Key, Mixin}; - use store::Tag; + #[derive(Mixin, Encode, Decode)] + pub struct Profile { + pub post_count: usize, + pub account_name: String, + pub display_name: Option, + pub about_string: Option, + pub about_fields: Vec<(String, String)>, + } - pub const ACTOR: Tag = Tag(0); - pub const POST: Tag = Tag(1); - pub const BITE: Tag = Tag(2); + #[derive(Mixin, Encode, Decode)] + pub struct Content { + pub content: Option, + pub summary: Option, + } + + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] + pub struct AuthorOf { + #[origin] + pub author: Key, + #[target] + pub object: Key, + } + + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] + pub struct Follows { + #[origin] + pub follower: Key, + #[target] + pub followed: Key, + } + + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] + pub struct Bite { + #[identity] + pub id: Key, + #[origin] + pub biter: Key, + #[target] + pub victim: Key, + } + + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] + pub struct FollowRequest { + #[identity] + pub id: Key, + pub origin: Key, + pub target: Key, + } + + #[derive(Alias)] + pub struct Username(pub String); + + /// Construct the schema. + pub fn schema() -> Schema { + Schema::new() + // Mixins + .has::() + .has::() + // Aliases + .has::() + // Arrows + .has::() + .has::() + .has::() + .has::() + } } pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result { let key = Key::gen(); - db.transaction(|tx| { - tx.create_vertex(key, tags::POST)?; - tx.update::(author, |_, mut profile| { + db.run(|tx| { + tx.update::(author, |mut profile| { profile.post_count += 1; - Ok(profile) + profile })?; - tx.insert(key, Content { + tx.add_mixin(key, Content { content: Some(content.to_string()), summary: None, })?; - tx.insert_arrow((author, key), AuthorOf)?; + tx.create(AuthorOf { + author, + object: key, + })?; Ok(key) }) } pub fn create_actor(db: &Store, username: impl ToString) -> store::Result { let key = Key::gen(); - db.transaction(|tx| { - tx.create_vertex(key, tags::ACTOR)?; - tx.insert_alias(key, Username(username.to_string()))?; - tx.insert(key, Profile { + db.run(|tx| { + tx.add_alias(key, Username(username.to_string()))?; + tx.add_mixin(key, Profile { post_count: 0, account_name: username.to_string(), display_name: None, @@ -51,49 +109,36 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result { }) } -pub fn list_posts_by_author( - db: &Store, - author: impl Keylike, -) -> store::Result> { - db.transaction(|tx| { - tx.list_outgoing::(author) - .bind_results(|(post_key, _)| tx.lookup::(post_key)) - .collect() +pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result> { + db.run(|tx| { + let keys = tx.outgoing::(author).map_ok(|a| a.object); + let posts = tx + .join_on(keys)? + .into_iter() + .filter_map(|(k, opt)| try { (k, opt?) }) + .collect(); + Ok(posts) }) } -pub struct Bite { - pub id: Key, - pub biter: Key, - pub victim: Key, -} - -impl MultiArrow for Bite { - const TYPE: Tag = tags::BITE; -} - pub fn bite_actor(db: &Store, biter: Key, victim: Key) -> store::Result { - db.transaction(|tx| { - // Bites are represented as multiedges. - let key = arrow::multi::insert::(&tx, biter, victim)?; - // We can treat particular arrows in a quiver as a vertex by registering it. - tx.create_vertex(key, tags::BITE)?; - Ok(key) + db.run(|tx| { + let id = Key::gen(); + tx.create(Bite { id, biter, victim })?; + Ok(id) }) } pub fn bites_on(db: &Store, victim: Key) -> store::Result> { - db.transaction(|tx| { - arrow::multi::list_incoming::(&tx, victim) - .map_ok(|(biter, id)| Bite { id, biter, victim }) - .try_collect() - }) + db.incoming::(victim).try_collect() } pub mod tl { //! Timelines - use store::{arrow::AuthorOf, mixin::Content, util::IterExt as _, Error, Key, Result, Store}; + use store::{util::IterExt as _, Error, Key, Result, Store}; + + use crate::model::{AuthorOf, Content}; pub struct Post { pub id: Key, @@ -102,13 +147,11 @@ pub mod tl { } pub fn fetch_all(db: &Store) -> Result> { - db.transaction(|tx| { - let iter = tx.list::(); + db.run(|tx| { + let iter = tx.range::(..); iter.bind_results(|(id, content)| { - let author = tx - .list_incoming::(id) - .keys() - .next_or(Error::Missing)?; + let AuthorOf { author, .. } = + tx.incoming::(id).next_or(Error::Missing)?; Ok(Post { id, author, @@ -123,54 +166,64 @@ pub mod tl { pub mod fr { //! Follow requests - use store::{ - arrow::{FollowRequested, Follows}, - util::IterExt as _, - Key, Store, OK, - }; + use store::{util::IterExt as _, Key, Store, OK}; - pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> { - db.transaction(|tx| { - tx.insert_arrow((requester, target), FollowRequested)?; - OK + use crate::model::{FollowRequest, Follows}; + + pub fn create(db: &Store, requester: Key, target: Key) -> store::Result { + db.run(|tx| { + let req = FollowRequest { + id: Key::gen(), + origin: requester, + target, + }; + tx.create(req)?; + Ok(req) }) } pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> { - db.transaction(|tx| { - tx.remove_arrow::((requester, target))?; - tx.insert_arrow((requester, target), Follows)?; + db.run(|tx| { + tx.delete_all::(requester, target)?; + tx.create(Follows { + follower: requester, + followed: target, + })?; OK }) } pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> { - db.transaction(|tx| { - tx.remove_arrow::((requester, target))?; + db.run(|tx| { + tx.delete_all::(requester, target)?; OK }) } - pub fn list_pending(db: &Store, target: Key) -> store::Result> { - db.transaction(|tx| tx.list_incoming::(target).keys().collect()) + pub fn list_pending(db: &Store, target: Key) -> store::Result> { + db.incoming::(target).collect() } pub fn following_of(db: &Store, actor: Key) -> store::Result> { - db.transaction(|tx| tx.list_outgoing::(actor).keys().collect()) + db.outgoing::(actor) + .map_ok(|a| a.followed) + .collect() } pub fn followers_of(db: &Store, actor: Key) -> store::Result> { - db.transaction(|tx| tx.list_incoming::(actor).keys().collect()) + db.incoming::(actor) + .map_ok(|a| a.follower) + .collect() } #[cfg(test)] mod tests { - use store::{ - arrow::{FollowRequested, Follows}, - Key, Store, OK, - }; + use store::{Key, Store, OK}; - use crate::create_actor; + use crate::{ + create_actor, + model::{schema, FollowRequest, Follows}, + }; fn make_test_actors(db: &Store) -> store::Result<(Key, Key)> { let alice = create_actor(&db, "alice")?; @@ -181,18 +234,21 @@ pub mod fr { #[test] fn create_fr() -> store::Result<()> { - Store::with_tmp(|db| { + Store::test(schema(), |db| { let (alice, bob) = make_test_actors(&db)?; super::create(&db, alice, bob)?; assert!( - db.exists::((alice, bob))?, + db.exists::(alice, bob)?, "(alice -> bob) ∈ follow-requested" ); assert!( - !db.exists::((alice, bob))?, + !db.exists::(alice, bob)?, "(alice -> bob) ∉ follows" ); - let pending_for_bob = super::list_pending(&db, bob)?; + let pending_for_bob = super::list_pending(&db, bob)? + .into_iter() + .map(|fr| fr.origin) + .collect::>(); assert_eq!(pending_for_bob, vec![alice], "bob.pending = {{alice}}"); OK }) @@ -200,17 +256,17 @@ pub mod fr { #[test] fn accept_fr() -> store::Result<()> { - Store::with_tmp(|db| { + Store::test(schema(), |db| { let (alice, bob) = make_test_actors(&db)?; super::create(&db, alice, bob)?; super::accept(&db, alice, bob)?; assert!( - db.exists::((alice, bob))?, + db.exists::(alice, bob)?, "(alice -> bob) ∈ follows" ); assert!( - !db.exists::((bob, alice))?, + !db.exists::(bob, alice)?, "(bob -> alice) ∉ follows" ); @@ -226,7 +282,7 @@ pub mod fr { #[test] fn listing_follow_relations() -> store::Result<()> { - Store::with_tmp(|db| { + Store::test(schema(), |db| { let (alice, bob) = make_test_actors(&db)?; super::create(&db, alice, bob)?; super::accept(&db, alice, bob)?; diff --git a/lib/store/Cargo.toml b/lib/store/Cargo.toml index bf382b5..431004b 100644 --- a/lib/store/Cargo.toml +++ b/lib/store/Cargo.toml @@ -12,3 +12,5 @@ derive_more = "*" bincode = "2.0.0-rc.3" chrono = "*" tempfile = "*" +macro = { path = "../macro" } +either = "*" diff --git a/lib/store/src/alias.rs b/lib/store/src/alias.rs index 44d9b98..c495821 100644 --- a/lib/store/src/alias.rs +++ b/lib/store/src/alias.rs @@ -1,17 +1,115 @@ -//! Alternative keys. +/// Derive an implementation of [`Alias`]. +pub use r#macro::Alias; -use derive_more::{Display, From}; +use super::{ + types::{AliasSpec, Value}, + Batch, Store, Transaction, +}; +use crate::{Key, Result}; -use crate::Space; +/// An alternative unique identifier for a node. +pub trait Alias: Value + From + AsRef {} -/// An alternative unique key for a vertex. -pub trait Alias: ToString + From { - const SPACE: (Space, Space); +impl Transaction<'_> { + /// Look up the key associated with the alias. + pub fn lookup(&self, alias: A) -> Result> + where + A: Alias, + { + op::lookup::(self, alias.as_ref()) + } + /// Get the alias associated with the `node`. + pub fn get_alias(&self, node: Key) -> Result> + where + A: Alias, + { + op::get_alias(self, node) + } + /// Add an alias to `node`. + pub fn add_alias(&self, node: Key, alias: A) -> Result<()> + where + A: Alias, + { + op::add_alias::(self, node, alias.as_ref()) + } + /// Check whether `node` has an `M` defined for it. + pub fn has_alias(&self, node: Key) -> Result + where + A: Alias, + { + op::has_alias::(self, node) + } } -#[derive(Display, From)] -pub struct Username(pub String); - -impl Alias for Username { - const SPACE: (Space, Space) = (Space("username/l"), Space("username/r")); +impl Store { + /// Look up the key associated with the alias. + pub fn lookup(&self, alias: A) -> Result> + where + A: Alias, + { + op::lookup::(self, alias.as_ref()) + } + /// Get the alias associated with the `node`. + pub fn get_alias(&self, node: Key) -> Result> + where + A: Alias, + { + op::get_alias(self, node) + } + /// Check whether `node` has an `M` defined for it. + pub fn has_alias(&self, node: Key) -> Result + where + A: Alias, + { + op::has_alias::(self, node) + } +} + +impl Batch { + /// Add an alias to `node`. + /// + /// # Warning + /// + /// This will *not* fail if the key already has a alias of this type, and in fact *it will cause fundamental inconsistency* + /// if the alias already exists. Don't call this function unless you know that neither `node` nor `alias` exist yet. + pub fn put_alias(&mut self, node: Key, alias: A) + where + A: Alias, + { + // TODO: consistency *could* be checked by manually iterating over the transaction using `WriteBatch::iterate` + op::add_alias::(self, node, alias.as_ref()).unwrap(); + } +} + +mod op { + use crate::{internal::*, Alias, Key, Result, OK}; + + pub fn lookup(cx: &impl Query, alias: &str) -> Result> { + cx.open(A::SPEC.keyspace).get(alias).map(|k| match k { + Some(x) => Some(Key::from_slice(x.as_ref())), + None => None, + }) + } + + pub fn has_alias(cx: &impl Query, node: Key) -> Result { + cx.open(A::SPEC.reversed).has(node) + } + + pub fn add_alias(cx: &impl Write, node: Key, alias: &str) -> Result<()> { + cx.open(A::SPEC.keyspace).set(alias, node)?; + cx.open(A::SPEC.reversed).set(node, alias)?; + OK + } + + pub fn get_alias(cx: &impl Query, node: Key) -> Result> { + let buf = cx.open(A::SPEC.reversed).get(node)?; + Ok(buf.map(decode)) + } + + fn decode(data: impl AsRef<[u8]>) -> T + where + T: From, + { + T::from(String::from_utf8_lossy(data.as_ref()).into_owned()) + } } diff --git a/lib/store/src/arrow.rs b/lib/store/src/arrow.rs index bd1ed99..67b0095 100644 --- a/lib/store/src/arrow.rs +++ b/lib/store/src/arrow.rs @@ -1,79 +1,557 @@ -//! Relations between nodes. +//! Directed edges, both parallel and simple. +//! +//! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`]. +//! +//! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`], +//! and manipulating them can likewise be done from within the context of a `Transaction`. +//! +//! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a +//! few tricks to do with associated constants and types to make it all work nicely. +//! +//! # Terminology +//! +//! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes +//! can be seen as "things", and edges as connections between those things, defined by the two nodes that +//! they connect (which are called the *endpoints* of the edge). +//! +//! These edges can be directed or undirected. The difference is that undirected edges are identified by +//! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified +//! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and +//! the other is the *head* (usually called *target* here). +//! +//! # Arrow kinds +//! +//! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined +//! solely by which two nodes they connect, which means that their representation and certain operations +//! are more efficient. The trade-off is that they cannot capture more complex information than "this +//! edge exists". +//! +//! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple, +//! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they +//! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key, +//! they incur more space overhead, and most operations on them are more expensive compared to basic +//! edges. +//! +//! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which +//! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be +//! discernable from each other, each of them also has an `identity` key, in addition to listing the two +//! edges they connect. +//! +//! [mixins]: super::Mixin +#![allow(private_interfaces)] -use bincode::{Decode, Encode}; +pub use self::kinds::{Basic, Multi}; +use super::{ + types::{ArrowSpec, Value}, + Batch, Store, Transaction, +}; +use crate::{util::IterExt as _, Key, Result}; -use crate::Space; +/// A directed edge. +/// +/// See the [module docs][self] for an introduction. +pub trait Arrow: Value + From + Into { + /// The representation of this arrow, which also determines whether parallel edges are allowed. + type Kind: ArrowKind = Basic; +} -pub mod multi { - //! Managing multiedges. - //! - //! Unlike regular [`Arrow`]s, which don't have an identity (they are identified by the two nodes that - //! they connect), multiarrows can have their own [`Key`]. This allows one to have multiple arrows in - //! the same direction connecting the same two vertices, which isn't possible with normal arrows. - //! - //! Multiarrows can also be treated as if they were vertices, if their identity (`Key`) is registered as - //! one. - //! - //! This comes with a trade-off, though, specifically in both space and complexity. A multi-arrow also - //! can't have a label, like a typical arrow. +/// Parameterizing arrows so we can distinguish between kinds of arrows. +/// +/// This lets us present a common API for certain arrow-related operations while also leveraging some +/// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at +/// the type level and at the value level whether that arrow is a multi-arrow or not. +pub trait ArrowKind { + /// Whether this kind of arrow should be represented using the specialized representation for edges + /// that are allowed to be parallel. + const IS_MULTI: bool; + /// Construct an arrow from a buffer containing a correctly-oriented arrow. + /// + /// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently + /// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the + /// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively. + /// + /// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target. + /// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity", + /// which is needed to discriminate between multiple parallel edges. + /// + /// # Failure + /// + /// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for + /// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the + /// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory + /// safety issue, so this function is safe. + fn dec(buf: &[u8]) -> Self; + /// Encode an arrow's key origin-first and target-first. + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>); + #[doc(hidden)] + /// Turn him into a raw edge. + fn raw(&self) -> Raw; +} - use crate::{Key, Result, Tag, Transaction}; +union Raw { + multi: Multi, + basic: Basic, +} - pub fn insert(tx: &Transaction<'_>, origin: Key, target: Key) -> Result +impl Store { + /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. + pub fn exists(&self, origin: Key, target: Key) -> Result where - A: MultiArrow, + A: Arrow, { - let key = Key::gen(); - tx.quiver(A::TYPE).insert(origin, target, key)?; - Ok(key) + op::exists::(self, origin, target) + } + /// Get all arrows of type `A` that point at `target`. + pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator> + 'a + where + A: Arrow + 'a, + { + op::incoming::(self, target).map_ok(A::from) + } + /// Get all arrows of type `A` that point away from `origin`. + pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator> + 'a + where + A: Arrow + 'a, + { + op::outgoing::(self, origin).map_ok(A::from) + } +} + +impl Transaction<'_> { + /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. + /// + /// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges). + pub fn exists(&self, origin: Key, target: Key) -> Result + where + A: Arrow, + { + op::exists::(self, origin, target) + } + /// Get all arrows of type `A` that point at `target`. + pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator> + 'a + where + A: Arrow + 'a, + { + op::incoming::(self, target).map_ok(A::from) + } + /// Get all arrows of type `A` that point away from `origin`. + pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator> + 'a + where + A: Arrow + 'a, + { + op::outgoing::(self, origin).map_ok(A::from) + } + /// Create a new arrow of type `A`. + /// + /// This operation supports both [`Multi`] and [`Basic`] arrows. + /// + /// # Example + /// + /// The following snippet creates an arrow between `origin` and `target`. + /// + /// ```rust + /// # fn main () -> store::Result<()> { + /// use store::{Arrow, Key}; + /// + /// #[derive(Arrow)] + /// struct MyArrow { origin: Key, target: Key } + /// + /// # let schema = store::types::Schema::new().has::(); + /// # store::Store::test(schema, |db| { + /// let origin = Key::gen(); + /// let target = Key::gen(); + /// + /// db.run(|tx| { + /// tx.create(MyArrow { origin, target }) + /// })?; + /// + /// assert!(db.exists::(origin, target)?); + /// # store::OK }) + /// # } + /// ``` + pub fn create(&self, arrow: A) -> Result<()> + where + A: Arrow, + { + op::create::(self, arrow.into()) + } + /// Delete all edges of type `A` from `origin` to `target`. + /// + /// It is not an error for this function not to delete anything. + pub fn delete_all(&self, origin: Key, target: Key) -> Result<()> + where + A: Arrow, + { + op::delete_all::(self, origin, target) + } + /// Delete a specific arrow. + pub fn delete_one(&self, arrow: A) -> Result<()> + where + A: Arrow, + { + op::delete_one::(self, arrow.into()) + } +} + +impl Batch { + /// Create an arrow. See [`Transaction::create`]. + pub fn create(&mut self, arrow: A) + where + A: Arrow, + { + op::create::(self, arrow.into()) + .expect("no errors expected to occur during batch operation") + } + /// Delete a specific arrow. + pub fn delete_one(&mut self, arrow: A) + where + A: Arrow, + { + op::delete_one::(self, arrow.into()) + .expect("no errors expected to occur during batch operation") + } +} + +mod op { + //! Implementations of arrow operations. + + use super::*; + use crate::{internal::*, types::MULTIEDGE_HEADERS, Key, Result, OK}; + + /// Check whether there exists at least one arrow of type `A` from `origin` to `target`. + pub fn exists(cx: &impl Query, origin: Key, target: Key) -> Result + where + A: Arrow, + { + if A::Kind::IS_MULTI { + // In the case of a multi-edge, at least one result from the prefix scan + // indicates that there is at least one edge. + cx.open(A::SPEC.by_origin) + .scan(origin.fuse(target)) + .next() + .transpose() + .map(|o| o.is_some()) + } else { + cx.open(A::SPEC.by_origin).has(origin.fuse(target)) + } } - pub fn list_incoming<'db, A>( - tx: &'db Transaction<'db>, + /// List incoming arrows relative to `target`. + pub fn incoming<'db, A>( + cx: &'db impl Query, target: Key, - ) -> impl Iterator> + 'db + ) -> impl Iterator> + 'db where - A: MultiArrow, + A: Arrow, + A::Kind: 'db, { - tx.quiver(A::TYPE).list_incoming(target) + // In the `by_target` keyspace, for either kind of arrow the layout is such that the target is + // the prefix, so we pick that keyspace to more efficiently list all arrows that target the key. + cx.open(A::SPEC.by_target) + .scan(target) + .map_ok(|(mut k, _)| { + // Arrows from `by_target` are oriented target-first, while the decoder function requires + // that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix + // or the whole slice, swapping the two keys always gives us the ordering expected by the + // decoding function. + let (t, o) = k[..32].split_at_mut(16); + t.swap_with_slice(o); + A::Kind::dec(&k) + }) } - pub trait MultiArrow { - const TYPE: Tag; + /// List outgoing arrows relative to `origin`. + pub fn outgoing<'db, A>( + cx: &'db impl Query, + origin: Key, + ) -> impl Iterator> + 'db + where + A: Arrow, + A::Kind: 'db, + { + cx.open(A::SPEC.by_origin) + .scan(origin) + .map_ok(|(ref k, _)| A::Kind::dec(k)) + } + + /// Create a new arrow. + pub fn create(cx: &impl Write, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + if A::Kind::IS_MULTI { + let Multi { + identity, + origin, + target, + } = unsafe { arrow.raw().multi }; + cx.open(MULTIEDGE_HEADERS) + .set(identity, origin.fuse(target))?; + } + let (by_origin, by_target) = arrow.enc(); + cx.open(A::SPEC.by_origin).set(by_origin, b"")?; + cx.open(A::SPEC.by_target).set(by_target, b"")?; + OK + } + + /// Delete all arrows from `origin` to `target`. + /// + /// TODO: Remove the query requirement (depends on range delete being available). + pub fn delete_all(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()> + where + A: Arrow, + { + let by_origin = cx.open(A::SPEC.by_origin); + let by_target = cx.open(A::SPEC.by_target); + Ok(if A::Kind::IS_MULTI { + let headers = cx.open(MULTIEDGE_HEADERS); + // TODO: optimize this implementation using range deletes. + // Unfortunately, range deletes are not available in transactional backends. + for key in by_origin.scan(origin.fuse(target)).keys() { + let key = Multi::decode(key?.as_ref()); + by_origin.del(key.encode())?; + by_target.del(key.swap().encode())?; + headers.del(key.identity)?; + } + } else { + by_origin.del(origin.fuse(target))?; + by_target.del(target.fuse(origin))?; + }) + } + + /// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist. + pub fn delete_one(cx: &impl Write, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + let (by_origin, by_target) = arrow.enc(); + cx.open(A::SPEC.by_origin).del(by_origin)?; + cx.open(A::SPEC.by_target).del(by_target)?; + OK } } -/// A directed edge between two vertices. -pub trait Arrow: Encode + Decode { - const SPACE: (Space, Space); -} - -/// Which way an arrow is pointing when viewed from a particular vertex. -pub enum Direction { - Incoming, - Outgoing, -} - -/// The node this arrow points away from is the "author" of the node the arrow points to. -#[derive(Encode, Decode)] -pub struct AuthorOf; - -impl Arrow for AuthorOf { - const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r")); -} - -/// The origin of this arrow has follow requested the target. -#[derive(Encode, Decode)] -pub struct FollowRequested; - -impl Arrow for FollowRequested { - const SPACE: (Space, Space) = (Space("pending-fr/l"), Space("pending-fr/r")); -} - -/// The origin "follows" the target. -#[derive(Encode, Decode)] -pub struct Follows; - -impl Arrow for Follows { - const SPACE: (Space, Space) = (Space("follows/l"), Space("follows/r")); +/// Types representing the different kinds of arrows. +mod kinds { + use super::ArrowKind; + use crate::Key; + + impl ArrowKind for Multi { + const IS_MULTI: bool = true; + fn dec(buf: &[u8]) -> Self { + Multi::decode(buf) + } + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { + (self.encode(), self.swap().encode()) + } + fn raw(&self) -> super::Raw { + super::Raw { multi: *self } + } + } + + impl ArrowKind for Basic { + const IS_MULTI: bool = false; + fn dec(buf: &[u8]) -> Self { + Basic::decode(buf) + } + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { + (self.encode(), self.reverse().encode()) + } + fn raw(&self) -> super::Raw { + super::Raw { basic: *self } + } + } + + /// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist + /// between two vertices. + #[derive(Clone, Copy)] + pub struct Multi { + /// The node that this edge points away from. + pub origin: Key, + /// The node that this edge points towards. + pub target: Key, + /// The discriminator of this particular edge, which distinguishes it from all other edges that + /// connect `origin` and `target`, and indeed from every other edge or node in the graph. + pub identity: Key, + } + + impl Multi { + /// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly + /// oriented, the results will be wrong; the arrow will be oriented *away* from the target and + /// *at* the origin, instead of the other way around. + /// + /// # Orientation + /// + /// In this context, *correctly oriented* means that it is laid out in *origin-first* order, + /// like this (where `o`, `t` and `i` represent bytes): + /// + /// ```text + /// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii + /// |--------------| |--------------| |--------------| + /// origin target identity + /// ..16 16..32 32.. + /// ``` + /// + /// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is + /// the prefix, followed by the origin, and then the identity. This is also called *target-first* + /// encoding in this documentation. + /// + /// # Silent failure + /// + /// There is no way to detect whether the ordering is correct from just the buffer, so the caller + /// must ensure that the order is correct. If you have a target-first encoded buffer, you can have + /// to swap the two keys before passing it into this function, or this function will give you an + /// edge that does not exist (since a multiedge can only point in one direction). + /// + /// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`. + /// + /// # Panics + /// + /// This function panics if `buf` is not exactly 48 bytes long. + pub fn decode(buf: &[u8]) -> Multi { + Multi { + origin: Key::from_slice(&buf[..16]), + target: Key::from_slice(&buf[16..32]), + identity: Key::from_slice(&buf[32..]), + } + } + /// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation + /// of the difference between origin-first encoding and target-first encoding. + pub fn encode(self) -> [u8; 48] { + let mut key = [0; 48]; + key[..16].copy_from_slice(&self.origin.0); + key[16..32].copy_from_slice(&self.target.0); + key[32..].copy_from_slice(&self.identity.0); + key + } + /// Swap the origin and target of this arrow, while leaving the identity the same. + pub(super) fn swap(self) -> Multi { + Multi { + origin: self.target, + target: self.origin, + ..self + } + } + } + + /// A normal directed edge. Duplicates are not allowed. + /// + /// This kind of arrow is useful for modeling predicates and simple relationships. + #[derive(Clone, Copy)] + pub struct Basic { + pub origin: Key, + pub target: Key, + } + + impl Basic { + /// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the + /// other direction). + pub fn reverse(self) -> Basic { + Basic { + origin: self.target, + target: self.origin, + } + } + /// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering. + pub fn encode(self) -> [u8; 32] { + self.origin.fuse(self.target) + } + /// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information + /// about key encoding. + /// + /// # Panics + /// + /// Panics if `buf` is not exactly 32 bytes long. + pub fn decode(buf: &[u8]) -> Basic { + let (origin, target) = Key::split(buf); + Basic { origin, target } + } + } } +/// Derive [`Arrow`] for a struct. +/// +/// This will generate the required [`Into`] and [`From`] impls, as well as an [`Arrow`](trait@Arrow) impl and +/// a [`Value`] impl with the namespaces derived from the name of the struct. The macro works on structs with +/// specific fields, or newtypes of any arrow kind. +/// +/// # Attributes +/// +/// The `origin`, `target` and `identity` attributes are used on fields of type [`Key`], and they are used +/// to map the arrow's type to an [`ArrowKind`]. The `#[origin]` annotation isn't needed if the struct contains +/// a field named `origin`. Ditto with `target` and `identity`. +/// +/// If there is no `identity` defined, the `ArrowKind` will be [`Basic`]. If an `identity` is defined, the kind +/// will be [`Multi`]. +/// +/// # Examples +/// +/// Generates a [`Basic`] arrow called `my-arrow`. +/// +/// ``` +/// use store::{Key, Arrow, types::Schema}; +/// +/// #[derive(Arrow)] +/// struct MyArrow { origin: Key, target: Key } +/// +/// // This will fail to compile if the type doesn't implement `Arrow` correctly +/// Schema::new().has::(); +/// ``` +/// +/// Newtypes of either arrow kind are supported. +/// +/// ``` +/// use store::{Key, arrow::{Basic, Multi, Arrow}}; +/// +/// /// The origin has requested to follow the target. +/// /// +/// /// Note: there may be more than one follow request between any two actors. +/// #[derive(Arrow)] +/// struct FollowRequest(Multi); +/// +/// /// A relation between two actors meaning that the origin follows the target. +/// #[derive(Arrow)] +/// struct Follows(Basic); +/// +/// /// Users can follow each other. +/// struct User(Key); +/// +/// impl User { +/// /// Make `self` follow `other`. +/// pub fn follows(self, other: User) -> Follows { +/// Follows(Basic { origin: self.0, target: other.0 }) +/// } +/// } +/// ``` +/// +/// Generates a [`Multi`] arrow called `my-multi-arrow`, mapping the multiarrow's discriminator to the struct's +/// `unique` field. +/// +/// ``` +/// use store::{Key, Arrow}; +/// +/// #[derive(Arrow)] +/// struct MyMultiArrow { +/// pub origin: Key, +/// pub target: Key, +/// #[identity] +/// pub unique: Key, +/// } +/// ``` +/// +/// The macro automatically adds `From` and `Into` implementations: +/// +/// ``` +/// use store::{Key, Arrow, arrow::Basic}; +/// +/// #[derive(Arrow)] +/// struct MyArrow { origin: Key, target: Key } +/// +/// let origin = Key::gen(); +/// let target = Key::gen(); +/// +/// let edge: Basic = MyArrow { origin, target }.into(); +/// +/// assert_eq!(origin, edge.origin); +/// assert_eq!(target, edge.target); +/// ``` +pub use r#macro::Arrow; diff --git a/lib/store/src/internal.rs b/lib/store/src/internal.rs new file mode 100644 index 0000000..7680cef --- /dev/null +++ b/lib/store/src/internal.rs @@ -0,0 +1,297 @@ +//! Provides a nice hashmap-esque interface for manipulating entries in the store's backend. + +use std::sync::Arc; + +use rocksdb::{BoundColumnFamily, IteratorMode}; + +pub use self::cx::{Context, Query, Write}; +use crate::{util::IterExt as _, Error, Result}; + +/// An internal interface to a specific keyspace that exposes basic hashmap-esque operations +/// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a +/// [`Store`]. +pub struct Keyspace<'db, C> { + pub(super) context: &'db C, + pub(super) cf: Arc>, +} + +impl<'db, C> Keyspace<'db, C> +where + C: Query, +{ + /// Fetch a value from the keyspace. + pub fn get(&self, key: impl AsRef<[u8]>) -> Result + 'db>> { + self.context.get_pinned(&self.cf, key) + } + /// Test whether a key exists. + pub fn has(&self, key: impl AsRef<[u8]>) -> Result { + self.get(key).map(|r| r.is_some()) + } + /// Execute a prefix scan. + pub fn scan( + &self, + prefix: impl AsRef<[u8]> + 'db, + ) -> impl Iterator, Box<[u8]>)>> + 'db { + let t = prefix.as_ref().to_vec(); + self.context + .prefix_iterator(&self.cf, prefix.as_ref()) + // The prefix iterator may "overshoot". This makes it stop when it reaches + // the end of the range that has the prefix. + .take_while(move |r| match r { + Ok((ref k, _)) => k.starts_with(&t), + _ => true, + }) + .map_err(Error::Internal) + } + /// List all pairs in the keyspace. + pub fn list(&self) -> impl Iterator, Box<[u8]>)>> + 'db { + self.context + .full_iterator(&self.cf, IteratorMode::Start) + .map_err(Error::Internal) + } + /// Execute a range scan + pub fn range( + &self, + lower: [u8; N], + upper: [u8; N], + ) -> impl Iterator, Box<[u8]>)>> + 'db { + // TODO: use a seek op to make this more efficient + self.context + .full_iterator(&self.cf, IteratorMode::Start) + .skip_while(move |r| match r { + Ok((ref k, _)) => k.as_ref() < &lower, + _ => false, + }) + // The prefix iterator may "overshoot". This makes it stop when it reaches + // the end of the range that has the prefix. + .take_while(move |r| match r { + Ok((ref k, _)) => k.as_ref() < &upper, + _ => true, + }) + .map_err(Error::Internal) + } + /// Join all the keys to their values in this keyspace. + /// + /// This may be optimized compared to many random point lookups. + pub fn join( + &self, + keys: impl IntoIterator>, + ) -> Vec>>> { + self.context + .multi_get(keys.into_iter().map(|x| (&self.cf, x))) + } +} + +impl Keyspace<'_, C> +where + C: Write, +{ + /// Set the given `key` to the `value`, overwriting it if there was already a value there. + pub fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { + self.context.put(&self.cf, key, val) + } + /// Drop the value if it exists. + pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> { + self.context.delete(&self.cf, key) + } +} + +mod cx { + //! Contexts for doing reads, writes or both to the database. + //! + //! The traits in this module map abstract calls to their methods on the [rocksdb] objects. + + use rocksdb::{ + AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode, + }; + + use super::Keyspace; + use crate::{util::IterExt as _, Backend, Batch, Error, Result, Store, Transaction, OK}; + + /// A context for executing database operations. + pub trait Context { + /// Open the keyspace identified by `cf`. + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> + where + Self: Sized; + } + + /// A context in which one can read from the data store. + /// + /// Specifically, this maps calls to either the transaction or the store's internals without us having + /// to implement methods for *both* transactions and the store. + pub trait Query: Context { + type Backend: DBAccess; + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>>; + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + /// Optimized multi-point lookup. + fn multi_get<'a, C: AsColumnFamilyRef + 'a>( + &'a self, + keys: impl IntoIterator)>, + ) -> Vec>>>; + } + + /// A context in which one can read from and modify the data store. + pub trait Write: Context { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>; + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()>; + } + + impl Context for Store { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { + Keyspace { + cf: self.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl Query for Store { + type Backend = Backend; + + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>> { + self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) + } + + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Backend> { + self.inner.prefix_iterator_cf(cf, prefix) + } + + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.full_iterator_cf(cf, mode) + } + + fn multi_get<'a, C: AsColumnFamilyRef + 'a>( + &'a self, + keys: impl IntoIterator)>, + ) -> Vec>>> { + self.inner + .multi_get_cf(keys) + .into_iter() + .map_err(Error::Internal) + .collect() + } + } + + impl Context for Transaction<'_> { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { + Keyspace { + cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl<'db> Query for Transaction<'db> { + type Backend = rocksdb::Transaction<'db, Backend>; + + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>> { + self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) + } + + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.prefix_iterator_cf(cf, prefix) + } + + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.full_iterator_cf(cf, mode) + } + + fn multi_get<'a, C: AsColumnFamilyRef + 'a>( + &'a self, + keys: impl IntoIterator)>, + ) -> Vec>>> { + self.inner + .multi_get_cf(keys) + .into_iter() + .map_err(Error::Internal) + .collect() + } + } + + impl Write for Transaction<'_> { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { + self.inner.delete_cf(cf, key).map_err(Error::Internal) + } + + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()> { + self.inner.put_cf(cf, key, val).map_err(Error::Internal) + } + } + + impl Context for Batch { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> + where + Self: Sized, + { + Keyspace { + cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl Write for Batch { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { + self.inner.borrow_mut().delete_cf(cf, key); + OK + } + + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()> { + self.inner.borrow_mut().put_cf(cf, key, val); + OK + } + } +} diff --git a/lib/store/src/key.rs b/lib/store/src/key.rs index 7036ee7..bf91394 100644 --- a/lib/store/src/key.rs +++ b/lib/store/src/key.rs @@ -3,7 +3,7 @@ use std::fmt::{Debug, Display}; use chrono::{DateTime, Utc}; use ulid::Ulid; -use crate::{Alias, Error, Result, Transaction}; +use crate::arrow::{ArrowKind, Basic, Multi}; /// A unique identifier for vertices in the database. #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] @@ -31,7 +31,7 @@ impl Key { Key(key) } pub fn timestamp(self) -> DateTime { - let ms = Ulid::from_bytes(self.0).timestamp_ms(); + let ms = self.to_ulid().timestamp_ms(); DateTime::from_timestamp_millis(ms as i64).unwrap() } /// Join two keys together. @@ -46,6 +46,14 @@ impl Key { let head = Key::from_slice(&buf[16..]); (tail, head) } + pub(crate) fn range(ts: DateTime) -> ([u8; 16], [u8; 16]) { + let min = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MIN).to_bytes(); + let max = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MAX).to_bytes(); + (min, max) + } + fn to_ulid(self) -> Ulid { + Ulid::from_bytes(self.0) + } } impl AsRef<[u8]> for Key { @@ -53,46 +61,3 @@ impl AsRef<[u8]> for Key { &self.0 } } - -/// Anything that can be used to reference a vertex, both "normal" [keys](Key) -/// and [aliases](Alias). -/// -/// In general, using a key directly is going to be more efficient than using -/// an alias, because it incurs less lookups. -pub trait Keylike: Sized { - /// Translate the thing to a [`Key`]. - /// - /// This function should return [`Error::Missing`] if the key cannot be located. - fn translate(self, tx: &Transaction<'_>) -> Result; - /// Translate, and check whether the key is actually registered. - /// - /// This function should return [`Error::Undefined`] if the key does not *really* - /// exist. It should return [`Error::Missing`] if the key can't be found. - fn checked_translate(self, tx: &Transaction<'_>) -> Result { - let key = self.translate(tx)?; - if !tx.is_registered(key)? { - Err(Error::Undefined { key }) - } else { - Ok(key) - } - } -} - -impl Keylike for Key { - fn translate(self, _: &Transaction<'_>) -> Result { - Ok(self) - } -} - -impl Keylike for A -where - A: Alias, -{ - fn translate(self, tx: &Transaction<'_>) -> Result { - tx.lookup_alias(self) - } -} - -/// A type tag identifying a vertex. -#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)] -pub struct Tag(pub u8); diff --git a/lib/store/src/lib.rs b/lib/store/src/lib.rs index 32c4a39..f8af4cf 100644 --- a/lib/store/src/lib.rs +++ b/lib/store/src/lib.rs @@ -1,78 +1,113 @@ -#![feature(iterator_try_collect)] -//! The data store abstractions used by the ActivityPuppy project. +#![feature(iterator_try_collect, associated_type_defaults)] +//! Data persistence for the ActivityPuppy social media server built on top of [rocksdb]. //! -//! Persistence in a puppy server is handled by this component, which implements a directed graph -//! inspired datastore on top of the [rocksdb] key-value store. +//! # Overview //! -//! The workflow for manipulating stuff in the store is to open a [`Store`], and then to call -//! its [`transaction`](Store::transaction) method. This method takes a function that, given -//! a [`Transaction`], returns a result with some value. The `Transaction` object contains some -//! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it -//! back. +//! The design of the data store's abstractions is heavily inspired by graph theory. The idea is to encourage +//! composition and loose coupling by segmenting all data associated with a node into [mixins][Mixin], and +//! modeling relations and predicates between nodes as [arrows][Arrow]. In additions, the key identifying a +//! node can be [aliased][Alias] by a string newtype, which must be unique within the namespace of that alias. //! -//! This component is specialized to puppy's storage needs, and probably won't be much use unless -//! you're writing something that interfaces with puppy. +//! The API is optimized for reducing boilerplate and legibility at the call site. +//! +//! There are three interfaces to the store: the read-only [`Store`], the write-only [`Batch`] and the [`Transaction`], +//! which allows both reads and writes. -use std::{path::Path, sync::Arc}; +use std::{cell::RefCell, path::Path, sync::Arc}; use derive_more::From; -use rocksdb::{MultiThreaded, Options, TransactionDBOptions}; - -type Backend = rocksdb::TransactionDB; +use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction}; +use types::Schema; +mod alias; +mod internal; mod key; -mod transaction; - -pub use key::{Key, Keylike, Tag}; -pub use transaction::Transaction; -pub use {alias::Alias, arrow::Arrow, mixin::Mixin}; - -pub mod alias; -pub mod arrow; -pub mod mixin; -pub mod util; - -/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly). -pub const OK: Result<()> = Ok(()); - -/// Master list of all column family names in use. -const SPACES: &[&'static str] = &[ - "registry", - "username/l", - "username/r", - "follows/l", - "follows/r", - "profile", - "content", - "created-by/l", - "created-by/r", - "pending-fr/l", - "pending-fr/r", - "multi:id-map", - "multi:index/l", - "multi:index/r", - #[cfg(test)] - "test-arrow/l", - #[cfg(test)] - "test-arrow/r", -]; - -/// The handle to the data store. -/// -/// This type can be cloned freely. -#[derive(Clone)] -pub struct Store { - inner: Arc, -} +mod mixin; /// The name of the puppy data store inside the state directory. const STORE_NAME: &str = "main-store"; +/// Open a [`Store`]. Creates one if it doesn't exist yet at the state directory path. +pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { + Store::open(state_dir, schema) +} + +pub use {alias::Alias, arrow::Arrow, key::Key, mixin::Mixin}; + +pub mod arrow; +pub mod types; +pub mod util; + +/// The main interface to the data persistence engine. +/// +/// This type provides reading capabilities, but does not expose APIs for manipulating data in the store. For +/// that, you must [run][Store::run] a [`Transaction`] or [apply][Store::apply] a [`Batch`]. +#[derive(Clone)] +pub struct Store { + // TODO: maybe switch to `OptimisticTransactionDB` because it has `batched_multi_get_cf`, which may be useful + // if we end up doing lots of point lookups. alternatively, maybe we don't need *transactions* altogether, and + // we can get away with write batches and snapshots. the main problem with transactions is that it doesn't let + // us do range deletes, which affects the efficiency of multiarrow deletion. + // + // a switch to write batches is feasible if we end up not doing reads and writes in the same transaction. + inner: Arc, +} + +/// Hosts APIs for manipulating the data store. +/// +/// You can access these APIs from the body of the closure passed to [`Store::run`]. +pub struct Transaction<'db> { + inner: rocksdb::Transaction<'db, Backend>, + store: &'db Store, +} + +/// A set of writes that are to be executed atomically. +pub struct Batch { + inner: RefCell>, + store: Store, +} + impl Store { - /// Open a data store in the given `state_dir`. + /// Run a [transaction][Transaction]. /// - /// If the data store does not exist yet, it will be created. - pub fn open(state_dir: impl AsRef) -> Result { + /// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not + /// recorded. Changes made inside a transaction can be read from within that transaction before they are + /// committed. + /// + /// If the closure passed to `run` returns an error, the transaction is rolled back, and otherwise the + /// changes are committed. + pub fn run(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result + where + E: From, + { + let tx = Transaction { + inner: self.inner.transaction(), + store: &self, + }; + let r = f(&tx); + if let Err(e) = if r.is_err() { + tx.inner.rollback() + } else { + tx.inner.commit() + } { + return Err(E::from(Error::Internal(e))); + } + r + } + /// Apply a batch of changes atomically. + pub fn apply(&self, batch: Batch) -> Result<()> { + self.inner.write(batch.inner.into_inner())?; + OK + } + /// Construct a [`Batch`]. + pub fn batch(&self) -> Batch { + Batch { + inner: RefCell::new(WriteBatchWithTransaction::default()), + store: self.clone(), + } + } + /// Open the data store in `state_dir`, and create one if it doesn't exist yet. + pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { let mut db_opts = Options::default(); db_opts.create_if_missing(true); db_opts.create_missing_column_families(true); @@ -81,57 +116,24 @@ impl Store { &db_opts, &tx_opts, state_dir.as_ref().join(STORE_NAME), - SPACES, + schema.0, )?); Ok(Store { inner }) } - /// Construct a temporary store, for testing. This store gets erased after `f` is done. - pub fn with_tmp(f: impl FnOnce(Store) -> Result) -> Result - where - E: From, - { - let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir"); - f(Store::open(tmp_dir)?) - } - /// Delete the whole store. - /// - /// **This deletes all data in the store**. Do not run this unless you want to delete all the state of the instance. + /// Delete the main data store in `state_dir` if it exists. pub fn nuke(state_dir: impl AsRef) -> Result<()> { Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME)) - .map_err(Error::from) + .map_err(Error::Internal) } - /// Get the value of mixin `M` for `key`. - pub fn lookup(&self, key: impl Keylike) -> Result<(Key, M)> - where - M: Mixin, - { - self.transaction(|tx| tx.lookup(key)) - } - /// Get the key associated with a given [alias][Alias]. - pub fn translate(&self, s: impl ToString) -> Result - where - A: Alias, - { - self.transaction(|tx| tx.lookup_alias(A::from(s.to_string()))) - } - /// Quickly test whether a particular [arrow][Arrow] exists. - pub fn exists(&self, arrow: (Key, Key)) -> Result - where - A: Arrow, - { - self.transaction(|tx| tx.exists::(arrow)) + /// Open a store that lives until `f` returns, for testing. + pub fn test(schema: Schema, f: impl FnOnce(Store) -> T) -> T { + let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir"); + f(Store::open(tmp_dir, schema).expect("failed to open temporary data store in {tmp_dir}")) } } -/// An isolated keyspace. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct Space(&'static str); - -impl AsRef for Space { - fn as_ref(&self) -> &str { - &self.0 - } -} +/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly). +pub const OK: Result<()> = Ok(()); /// Results from this component. pub type Result = std::result::Result; @@ -154,3 +156,5 @@ pub enum Error { Encoding(bincode::error::EncodeError), Decoding(bincode::error::DecodeError), } + +type Backend = rocksdb::TransactionDB; diff --git a/lib/store/src/mixin.rs b/lib/store/src/mixin.rs index 237808f..6ef8fa8 100644 --- a/lib/store/src/mixin.rs +++ b/lib/store/src/mixin.rs @@ -1,35 +1,250 @@ -//! Modules of information. +use std::ops::RangeBounds; use bincode::{Decode, Encode}; +use chrono::{DateTime, Utc}; -use crate::Space; +use super::{ + types::{MixinSpec, Value}, + Batch, Store, Transaction, +}; +use crate::{Error, Key, Result}; -/// A simple piece of data associated with a vertex. -pub trait Mixin: Encode + Decode { - const SPACE: Space; +/// Mixins are the simplest pieces of data in the store. +pub trait Mixin: Value + Encode + Decode {} + +/// Derive a [`Mixin`] implementation. +/// +/// In addition to deriving `Mixin`, you will need to derive or implement [`Encode`] +/// and [`Decode`]. +pub use r#macro::Mixin; + +impl Store { + /// Get the value! + pub fn get_mixin(&self, node: Key) -> Result> + where + M: Mixin, + { + op::get_mixin(self, node) + } + /// Check if `node` has a mixin `M`. + pub fn has_mixin(&self, node: Key) -> Result + where + M: Mixin, + { + op::has_mixin::(self, node) + } + /// Get all `M`s where the key's timestamp is within the `range`. + pub fn range( + &self, + range: impl RangeBounds>, + ) -> impl Iterator> + '_ + where + M: Mixin, + { + op::get_range(self, range) + } + /// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results. + pub fn join_on( + &self, + iter: impl IntoIterator>, + ) -> Result)>> + where + M: Mixin, + { + op::join_on(self, iter) + } } -/// Information needed to render a social media profile. -#[derive(Encode, Decode)] -pub struct Profile { - pub post_count: usize, - pub account_name: String, - pub display_name: Option, - pub about_string: Option, - pub about_fields: Vec<(String, String)>, +impl Transaction<'_> { + /// Apply an update function to the mixin `M` of `node`. + /// + /// # Errors + /// + /// - [`Error::Missing`]: if `node` does not have a mixin of this type. + /// + /// [`Error::Missing`]: crate::Error::Missing + pub fn update(&self, node: Key, update: impl FnOnce(M) -> M) -> Result<()> + where + M: Mixin, + { + op::update(self, node, update) + } + /// Get the mixin of the specified type associated with `node`. + pub fn get_mixin(&self, node: Key) -> Result> + where + M: Mixin, + { + op::get_mixin(self, node) + } + /// Add a mixin to `node`. + /// + /// # Errors + /// + /// - [`Error::Conflict`]: if `node` already has a mixin of type `M`. + /// + /// [`Error::Conflict`]: crate::Error::Missing + pub fn add_mixin(&self, node: Key, mixin: M) -> Result<()> + where + M: Mixin, + { + if op::has_mixin::(self, node)? { + return Err(Error::Conflict); + } else { + op::add_mixin::(self, node, mixin) + } + } + /// Check whether `node` has an `M` defined for it. + pub fn has_mixin(&self, node: Key) -> Result + where + M: Mixin, + { + op::has_mixin::(self, node) + } + /// Get all `M`s where the key's timestamp is within the `range`. + pub fn range( + &self, + range: impl RangeBounds>, + ) -> impl Iterator> + '_ + where + M: Mixin, + { + op::get_range(self, range) + } + /// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results. + pub fn join_on( + &self, + iter: impl IntoIterator>, + ) -> Result)>> + where + M: Mixin, + { + op::join_on(self, iter) + } } -impl Mixin for Profile { - const SPACE: Space = Space("profile"); +impl Batch { + /// Add a mixin to the `node`. + /// + /// **Note**: unlike [`Transaction::add_mixin`], this will *not* return an error if the key already has a mixin + /// of this type. This *should* not cause inconsistency. + pub fn put_mixin(&mut self, node: Key, mixin: M) + where + M: Mixin, + { + op::add_mixin(self, node, mixin).unwrap() + } } -/// Contents of a post. -#[derive(Encode, Decode)] -pub struct Content { - pub content: Option, - pub summary: Option, -} +mod op { + use std::ops::{Bound, RangeBounds}; -impl Mixin for Content { - const SPACE: Space = Space("content"); + use chrono::{DateTime, TimeDelta, Utc}; + use either::Either; + + use super::Mixin; + use crate::{internal::*, util::IterExt as _, Error, Key, Result}; + + pub fn update( + cx: &(impl Query + Write), + node: Key, + update: impl FnOnce(M) -> M, + ) -> Result<()> + where + M: Mixin, + { + // TODO: implement in terms of a merge operator instead of separate query and write ops. + // this would let us remove the `Query` bound, which would in turn let us update from within + // a batch. + // + // See https://github.com/facebook/rocksdb/wiki/Merge-Operator + // + // It looks like rocksdb allows you to specify a merge operator per column family.[^1] + // This means we can construct our column families with a merge operator that knows how to encode and decode mixins. + // + // [^1]: https://github.com/facebook/rocksdb/blob/9d37408f9af15c7a1ae42f9b94d06b27d98a011a/include/rocksdb/options.h#L128 + let tree = cx.open(M::SPEC.keyspace); + match tree.get(node.as_ref())? { + None => Err(Error::Missing), + Some(buf) => { + let new = decode(buf).map(update).and_then(encode)?; + tree.set(node, new) + } + } + } + + pub fn get_mixin(cx: &impl Query, node: Key) -> Result> { + cx.open(M::SPEC.keyspace).get(node)?.map(decode).transpose() + } + + pub fn add_mixin(cx: &impl Write, node: Key, mixin: M) -> Result<()> { + cx.open(M::SPEC.keyspace).set(node, encode(mixin)?) + } + + pub fn has_mixin(cx: &impl Query, node: Key) -> Result { + cx.open(M::SPEC.keyspace).has(node) + } + + pub fn get_range( + cx: &impl Query, + range: impl RangeBounds>, + ) -> impl Iterator> + '_ { + // TODO: Test this thoroughly + const MS: TimeDelta = TimeDelta::milliseconds(1); + let iter = match (range.start_bound(), range.end_bound()) { + (Bound::Unbounded, Bound::Unbounded) => Either::Left(cx.open(M::SPEC.keyspace).list()), + (min, max) => { + let lower = match min { + Bound::Unbounded => [u8::MIN; 16], + Bound::Included(inc) => Key::range(*inc).0, + Bound::Excluded(exc) => Key::range(*exc + MS).0, + }; + let upper = match max { + Bound::Unbounded => [u8::MAX; 16], + Bound::Included(inc) => Key::range(*inc).1, + Bound::Excluded(exc) => Key::range(*exc - MS).1, + }; + Either::Right(cx.open(M::SPEC.keyspace).range(lower, upper)) + } + }; + iter.bind_results(|(k, v)| { + let key = Key::from_slice(k.as_ref()); + let val = decode(v)?; + Ok((key, val)) + }) + } + + pub fn join_on( + cx: &impl Query, + iter: impl IntoIterator>, + ) -> Result)>> + where + M: Mixin, + { + let keys: Vec = iter.into_iter().try_collect()?; + cx.open(M::SPEC.keyspace) + .join(keys.iter()) + .into_iter() + .zip(keys) + .map(|(opt, key)| { + let Some(buf) = opt? else { + return Ok((key, None)); + }; + let val = decode(buf)?; + Ok((key, Some(val))) + }) + .try_collect() + } + + pub(super) fn encode(data: impl bincode::Encode) -> Result> { + bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) + } + + pub(super) fn decode(data: impl AsRef<[u8]>) -> Result + where + T: bincode::Decode, + { + bincode::decode_from_slice(data.as_ref(), bincode::config::standard()) + .map_err(Error::Decoding) + .map(|(v, _)| v) + } } diff --git a/lib/store/src/transaction.rs b/lib/store/src/transaction.rs deleted file mode 100644 index d8e479a..0000000 --- a/lib/store/src/transaction.rs +++ /dev/null @@ -1,432 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use bincode::{Decode, Encode}; -use rocksdb::{BoundColumnFamily, IteratorMode}; - -use crate::{ - arrow::Direction, key::Tag, util::IterExt as _, Alias, Arrow, Backend, Error, Key, Keylike, - Mixin, Result, Store, OK, SPACES, -}; - -impl Store { - /// Initiate a transaction. - /// - /// If the result is an error, the transaction is rolled back, and otherwise the transaction - /// is committed. - pub fn transaction(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result { - // Load all the column family handles, because they can't be accessed through the - // `rocksdb::Transaction` struct, only the `TransactionDB`. - let spaces = SPACES - .into_iter() - .map(|name| (*name, self.inner.cf_handle(name).unwrap())) - .collect(); - let tx = Transaction { - inner: self.inner.transaction(), - spaces, - }; - let result = f(&tx); - if result.is_err() { - tx.inner.rollback()?; - } else { - tx.inner.commit()?; - } - result - } - /// Check whether a key exists in the registry, - pub fn is_registered(&self, key: Key) -> Result { - let cf = self - .inner - .cf_handle("registry") - .expect("failed to open registry"); - self.inner - .get_pinned_cf(&cf, key) - .map(|opt| opt.is_some()) - .map_err(Error::Internal) - } -} - -/// A database transaction, in which either each action succeeds, or everything fails -/// together. -/// -/// The transaction struct is the interface for quering and manipulating persisted content. -pub struct Transaction<'db> { - inner: rocksdb::Transaction<'db, Backend>, - spaces: HashMap<&'static str, Arc>>, -} - -/// Methods for manipulating the registry. -/// -/// Before you can manipulate a vertex, its needs to be registered. -impl Transaction<'_> { - /// Register a new vertex. - pub fn create_vertex(&self, key: Key, tag: Tag) -> Result<()> { - self.with("registry").set(key, [tag.0]) - } - /// Delete a vertex from the registry. - pub fn delete_vertex(&self, key: Key) -> Result<()> { - // TODO: also make this delete all related data? - self.with("registry").del(key) - } - /// Check whether a vertex is registered in the database. - pub fn is_registered(&self, key: Key) -> Result { - self.with("registry").has(key) - } -} - -/// Methods for manipulating mixins. -/// -/// For each implementor of [`Mixin`], a vertex can have at most one record of that type -/// associated with it. -impl Transaction<'_> { - /// Query the store for a value associated with the vertex `key` identifies. - /// - /// Using a [`Key`] is more efficient than using an alias. - pub fn lookup(&self, key: impl Keylike) -> Result<(Key, M)> - where - M: Mixin, - { - // Checked translate isn't needed, we'll complain if we can't find the data. - let canonicalized_key = key.translate(&self)?; - let raw = self.with(M::SPACE).get(canonicalized_key)?; - let value = decode(raw.as_ref())?; - Ok((canonicalized_key, value)) - } - /// Associate a new mixin value with the key. - /// - /// # Errors - /// - /// - `Error::Conflict` if a mixin of this type is already associated with the vertex - /// - `Error::Undefined` if `key` is not in the registry. - pub fn insert(&self, key: impl Keylike, data: M) -> Result<()> - where - M: Mixin, - { - let key = key.checked_translate(&self)?; - let data = encode(data)?; - let ns = self.with(M::SPACE); - // Check for conflicts. Fail if the key already exists, otherwise set the key - // to the given value. - if ns.has(key)? { - Err(Error::Conflict) - } else { - ns.set(key, data) - } - } - /// Apply an update function to the mixin identified by the key. - /// - /// # Errors - /// - /// - `Error::Undefined` if the `key` is not registered - /// - `Error::Missing` if `key` does not exist in the keyspace associated with `M` - pub fn update(&self, key: impl Keylike, f: impl FnOnce(Key, M) -> Result) -> Result<()> - where - M: Mixin, - { - let key = key.checked_translate(self)?; - let (key, old) = self.lookup::(key)?; - let new = f(key, old).and_then(encode)?; - self.with(M::SPACE).set(key, new) - } - /// Remove the mixin from the vertex `key` refers to. - /// - /// Doesn't complain if the value does not exist in the expected keyspace. - pub fn remove(&self, key: impl Keylike) -> Result> - where - M: Mixin, - { - // Checked translate isn't needed because we don't care if the key is bogus. - let canonical_key = key.translate(self)?; - let ns = self.with(M::SPACE); - match ns.pop(canonical_key) { - Ok(Some(val)) => decode(&val).map(Some), - Ok(None) => Ok(None), - Err(err) => Err(err), - } - } - /// List all key-value pairs for mixins of type `M`. - pub fn list(&self) -> impl Iterator> + '_ - where - M: Mixin, - { - self.with(M::SPACE).list().bind_results(|(k, v)| { - let v = decode(v.as_ref())?; - let k = Key::from_slice(k.as_ref()); - Ok((k, v)) - }) - } -} - -/// Methods for interacting with [aliases][Alias], which are unique alternate keys. -impl Transaction<'_> { - /// Look up the key that the given alias maps to. - /// - /// If the key was deleted, but the alias wasn't properly cleaned up, - pub fn lookup_alias(&self, alias: A) -> Result - where - A: Alias, - { - let (l, _) = A::SPACE; - let raw = self.with(l).get(alias.to_string())?; - Ok(Key::from_slice(raw.as_ref())) - } - /// Create a new alias of type `A` for the given [`Key`]. - /// - /// If the alias already exists, this function returns `Conflict`. - pub fn insert_alias(&self, key: Key, alias: A) -> Result<()> - where - A: Alias, - { - let (l, r) = A::SPACE; - let alias = alias.to_string(); - if self.with(l).has(&alias)? { - return Err(Error::Conflict); - } - self.with(l).set(&alias, key)?; - self.with(r).set(key, &alias)?; - OK - } - /// Delete the alias of type `A` that points to `key`. - pub fn remove_alias(&self, key: Key) -> Result<()> - where - A: Alias, - { - let (l, r) = A::SPACE; - // First, pop the reverse mapping, which will give us the encoded - // key for the normal mapping. If it doesn't exist, don't delete - // the normal mapping. - if let Some(alias) = self.with(r).pop(key)? { - self.with(l).pop(alias)?; - } - OK - } -} - -impl Transaction<'_> { - /// Find an arrow of type `A` with the given `tail` and `head`. - pub fn lookup_arrow(&self, (tail, head): (Key, Key)) -> Result> - where - A: Arrow, - { - let (l, _) = A::SPACE; - match self.with(l).get(tail.fuse(head)) { - Ok(raw) => decode(raw.as_ref()).map(Some), - Err(Error::Missing) => Ok(None), - Err(err) => Err(err), - } - } - /// Create a new arrow of type `A` and associate the label with it. - /// - /// # Errors - /// - /// - `Error::Undefined` if either key is not registered - pub fn insert_arrow(&self, (tail, head): (Key, Key), label: A) -> Result<()> - where - A: Arrow, - { - if !self.is_registered(tail)? { - return Err(Error::Undefined { key: tail }); - } - if !self.is_registered(head)? { - return Err(Error::Undefined { key: head }); - } - let (l, r) = A::SPACE; - let label = encode(label)?; - self.with(l).set(tail.fuse(head), &label)?; - self.with(r).set(head.fuse(tail), &label)?; - OK - } - /// Delete an arrow from the data store. - pub fn remove_arrow(&self, (tail, head): (Key, Key)) -> Result<()> - where - A: Arrow, - { - self.with(A::SPACE.0).del(tail.fuse(head))?; - self.with(A::SPACE.1).del(head.fuse(tail))?; - OK - } - /// Check whether an arrow exists. - pub fn exists(&self, (tail, head): (Key, Key)) -> Result - where - A: Arrow, - { - self.with(A::SPACE.0).has(tail.fuse(head)) - } - /// Get all arrows of type `A` "pointing at" `key`. - pub fn list_incoming(&self, key: impl Keylike) -> impl Iterator> + '_ - where - A: Arrow, - { - self.list_arrows_where(Direction::Incoming, key) - } - /// Get all arrows of type `A` "pointing away from" `key`. - pub fn list_outgoing(&self, key: impl Keylike) -> impl Iterator> + '_ - where - A: Arrow, - { - self.list_arrows_where(Direction::Outgoing, key) - } - /// Get all arrows of type `A`. - pub fn list_arrows(&self) -> impl Iterator> + '_ - where - A: Arrow, - { - self.with(A::SPACE.0).list().bind_results(|(k, v)| { - let (tail, head) = Key::split(k.as_ref()); - decode(v.as_ref()).map(|label| (tail, label, head)) - }) - } - /// Select arrows with the given direction relative to the given key. - fn list_arrows_where( - &self, - direction: Direction, - key: impl Keylike, - ) -> impl Iterator> + '_ - where - A: Arrow, - { - // Keys in space 0 are arranged with the tail at the start, and the ones in space 1 - // are arranged with the head at the start. This allows us to efficiently prefix scan - // regardless of the direction, at the cost of increased space usage. - let space = match direction { - Direction::Outgoing => A::SPACE.0, - Direction::Incoming => A::SPACE.1, - }; - let key = key.translate(&self).unwrap(); - #[cfg(test)] - eprintln!("scanning {} using prefix {key}", space.0); - self.with(space).scan(key).bind_results(|(k, v)| { - // Because we're prefix scanning on the first half of the key, we only want to - // get the second here. - let (_this, other) = Key::split(k.as_ref()); - #[cfg(test)] - eprintln!(" found {_this}:{other}"); - decode(v.as_ref()).map(|label| (other, label)) - }) - } - pub(crate) fn quiver(&self, tag: Tag) -> Quiver<'_> { - Quiver { tag, tx: &self } - } -} - -impl Transaction<'_> { - /// Use a keyspace. - fn with(&self, name: impl AsRef) -> Keyspace<'_> { - Keyspace { - cf: self.spaces[name.as_ref()].clone(), - tx: &self, - } - } -} - -/// Provides the basic API for a keyspace/column family. -struct Keyspace<'db> { - tx: &'db Transaction<'db>, - cf: Arc>, -} - -impl<'db> Keyspace<'db> { - /// Retrieve a value from the database. Returns `Missing` if the key does not exist. - fn get(&self, key: impl AsRef<[u8]>) -> Result + 'db> { - self.tx - .inner - .get_pinned_cf(&self.cf, key) - .map_err(Error::Internal) - .and_then(|opt| opt.ok_or(Error::Missing)) - } - /// Set the value at `key` to `val`. - fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { - self.tx - .inner - .put_cf(&self.cf, key, val) - .map_err(Error::Internal) - } - /// Delete the key-value pair identified by `key`. - fn del(&self, key: impl AsRef<[u8]>) -> Result<()> { - self.tx.inner.delete_cf(&self.cf, &key)?; - OK - } - /// Remove the key and associated value from the keyspace, and return its previous value. - fn pop(&self, key: impl AsRef<[u8]>) -> Result>> { - let old = self.tx.inner.get_for_update_cf(&self.cf, &key, true)?; - self.del(key)?; - Ok(old) - } - /// Check whether the key exists in the keyspace. - fn has(&self, key: impl AsRef<[u8]>) -> Result { - self.tx - .inner - .get_pinned_cf(&self.cf, key) - .map_err(Error::Internal) - .map(|opt| opt.is_some()) - } - /// Execute a prefix scan. - fn scan( - &self, - prefix: impl AsRef<[u8]> + 'db, - ) -> impl Iterator + 'static, impl AsRef<[u8]> + 'static)>> + 'db - { - let t = prefix.as_ref().to_vec(); - self.tx - .inner - .prefix_iterator_cf(&self.cf, prefix.as_ref()) - // The prefix iterator may "overshoot". This makes it stop when it reaches - // the end of the range that has the prefix. - .take_while(move |r| match r { - Ok((ref k, _)) => k.starts_with(&t), - _ => true, - }) - .map_err(Error::Internal) - } - /// Show all items in the entire keyspace. - fn list( - &self, - ) -> impl Iterator + 'static, impl AsRef<[u8]> + 'static)>> + 'db - { - self.tx - .inner - .full_iterator_cf(&self.cf, IteratorMode::Start) - .map_err(Error::Internal) - } -} - -/// The quiver allows one to manipulate all parallel edges tagged with a particular type. -pub struct Quiver<'db> { - tx: &'db Transaction<'db>, - tag: Tag, -} - -impl<'db> Quiver<'db> { - pub fn insert(&self, origin: Key, target: Key, identity: Key) -> Result<()> { - let fused = origin.fuse(target); - self.tx.with("multi:id-map").set(identity, fused)?; - let mut triple = [0; 48]; - triple[..32].copy_from_slice(&fused); - triple[32..].copy_from_slice(identity.as_ref()); - self.tx.with("multi:index/l").set(triple, b"")?; - triple[..32].rotate_left(16); - self.tx.with("multi:index/r").set(triple, b"")?; - OK - } - pub fn list_incoming(&self, target: Key) -> impl Iterator> + 'db { - self.tx - .with("multi:index/r") - .scan(target) - .map_ok(|(k, _)| Key::split(&k.as_ref()[16..])) - } -} - -fn encode(data: impl Encode) -> Result> { - bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) -} - -fn decode(data: &[u8]) -> Result -where - T: Decode, -{ - bincode::decode_from_slice(data, bincode::config::standard()) - .map_err(Error::Decoding) - .map(|(v, _)| v) -} - -#[cfg(test)] -mod tests; diff --git a/lib/store/src/transaction/tests.rs b/lib/store/src/transaction/tests.rs deleted file mode 100644 index ee0670a..0000000 --- a/lib/store/src/transaction/tests.rs +++ /dev/null @@ -1,256 +0,0 @@ -use super::*; -use crate::Space; - -#[derive(Encode, Decode)] -struct TestArrow; - -impl Arrow for TestArrow { - const SPACE: (Space, Space) = (Space("test-arrow/l"), Space("test-arrow/r")); -} - -const TEST_TAG: Tag = Tag(69); - -macro_rules! keygen { - { $($name:ident)* } => { - $( - let $name = Key::gen(); - eprintln!(concat!(stringify!($name), "={}"), $name); - )* - } -} - -fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) -> Result<()> { - Store::with_tmp(|db| { - // Run these tests 128 times because misuse of prefix iterator may cause weird, - // obscure bugs :3 - // - // Also, because we don't wipe the store between test runs, we have more chances - // to discover weird bugs that we wouldn't catch if there was only a single run. - Ok(for n in 0..128 { - eprintln!("--- run {n} ---"); - db.transaction(|tx| { - keygen!(target origin); - - tx.create_vertex(target, TEST_TAG)?; - tx.create_vertex(origin, TEST_TAG)?; - - tx.insert_arrow((origin, target), TestArrow)?; - - let l: Vec = tx - .with("test-arrow/l") - .list() - .map_ok(|(k, _)| Key::split(k.as_ref())) - .map_ok(|(a, b)| format!("({a}, {b})")) - .try_collect()?; - - eprintln!("test-arrow/l = {l:#?}"); - - let r: Vec = tx - .with("test-arrow/r") - .list() - .map_ok(|(k, _)| Key::split(k.as_ref())) - .map_ok(|(a, b)| format!("({a}, {b})")) - .try_collect()?; - - eprintln!("test-arrow/r = {r:#?}"); - - f(origin, target, &tx, n) - })?; - eprintln!("--- end run {n} ---"); - }) - }) -} - -#[test] -fn target_incoming() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let ti: Vec<_> = tx.list_incoming::(target).keys().try_collect()?; - - eprintln!("target.incoming = {ti:#?}"); - - assert!(ti.contains(&origin), "origin ∈ target.incoming"); - assert!(!ti.contains(&target), "target ∉ target.incoming"); - - OK - }) -} - -#[test] -fn target_outgoing() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let to: Vec<_> = tx.list_outgoing::(target).keys().try_collect()?; - - eprintln!("target.outgoing = {to:#?}"); - - assert!(!to.contains(&target), "target ∉ target.outgoing"); - assert!(!to.contains(&origin), "origin ∉ target.outgoing"); - - OK - }) -} - -#[test] -fn origin_incoming() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let oi: Vec<_> = tx.list_incoming::(origin).keys().try_collect()?; - - eprintln!("origin.incoming = {oi:#?}"); - - assert!(!oi.contains(&origin), "origin ∉ origin.incoming"); - assert!(!oi.contains(&target), "target ∉ origin.incoming"); - - OK - }) -} - -#[test] -fn origin_outgoing() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let oo: Vec<_> = tx.list_outgoing::(origin).keys().try_collect()?; - - eprintln!("origin.outgoing = {oo:#?}"); - - assert!(oo.contains(&target), "target ∈ origin.outgoing"); - assert!(!oo.contains(&origin), "origin ∉ origin.outgoing"); - - OK - }) -} - -#[test] -fn fanout() -> Result<()> { - let targets: [Key; 128] = std::array::from_fn(|_| Key::gen()); - let origin = Key::gen(); - Store::with_tmp(|db| { - db.transaction(|tx| { - tx.create_vertex(origin, TEST_TAG)?; - for t in targets { - tx.create_vertex(t, TEST_TAG)?; - tx.insert_arrow((origin, t), TestArrow)?; - } - - let oo: Vec<_> = tx.list_outgoing::(origin).keys().try_collect()?; - - for t in targets { - assert!(oo.contains(&t), "∀ t ∈ targets: t ∈ origin.outgoing"); - let ti: Vec<_> = tx.list_incoming::(t).keys().try_collect()?; - assert!( - ti == vec! {origin}, - "∀ t ∈ targets: t.incoming = {{origin}}" - ); - } - - OK - }) - }) -} - -#[test] -fn fanin() -> Result<()> { - let origins: [Key; 128] = std::array::from_fn(|_| Key::gen()); - let target = Key::gen(); - Store::with_tmp(|db| { - db.transaction(|tx| { - tx.create_vertex(target, TEST_TAG)?; - for o in origins { - tx.create_vertex(o, TEST_TAG)?; - tx.insert_arrow((o, target), TestArrow)?; - } - - let ti: Vec<_> = tx.list_incoming::(target).keys().try_collect()?; - - for o in origins { - let oo: Vec<_> = tx.list_outgoing::(o).keys().try_collect()?; - assert!(ti.contains(&o), "∀ o ∈ origins: o ∈ target.incoming"); - assert!( - oo == vec! {target}, - "∀ o ∈ origins: o.outgoing = {{target}}" - ); - } - - OK - }) - }) -} - -#[test] -fn distinct_many_to_many() -> Result<()> { - let origins: [Key; 32] = std::array::from_fn(|_| Key::gen()); - let targets: [Key; 32] = std::array::from_fn(|_| Key::gen()); - Store::with_tmp(|db| { - db.transaction(|tx| { - for t in targets { - tx.create_vertex(t, TEST_TAG)?; - } - for o in origins { - tx.create_vertex(o, TEST_TAG)?; - for t in targets { - tx.insert_arrow((o, t), TestArrow)?; - } - } - - let ti: HashMap> = targets - .into_iter() - .map(|t| { - tx.list_incoming::(t) - .keys() - .try_collect() - .map(|v: Vec<_>| (t, v)) - }) - .collect::>()?; - - // For each origin point, there must be a target that has it as "incoming". - assert!( - origins - .into_iter() - .all(|o| { targets.into_iter().any(|t| { ti[&t].contains(&o) }) }), - "∀ o ∈ origins: ∃ t ∈ targets: o ∈ t.incoming" - ); - - // Each target has each origin as incoming. - assert!( - origins - .into_iter() - .all(|o| { targets.into_iter().all(|t| { ti[&t].contains(&o) }) }), - "∀ o ∈ origins: ∀ t ∈ targets: o ∈ t.incoming" - ); - - let to: HashMap> = targets - .into_iter() - .map(|t| { - tx.list_outgoing::(t) - .keys() - .try_collect() - .map(|v: Vec<_>| (t, v)) - }) - .collect::>()?; - - // Our arrows point only from origins to targets, and there's a bug if there - // exists a target such that its outgoing set is non-empty. - assert!( - !targets.into_iter().any(|t| !to[&t].is_empty()), - "∄ t ∈ targets: t.outgoing ≠ ∅" - ); - - let oo: HashMap> = origins - .into_iter() - .map(|o| { - tx.list_outgoing::(o) - .keys() - .try_collect() - .map(|v: Vec<_>| (o, v)) - }) - .collect::>()?; - - // Each origin has each target as outgoing. - assert!( - origins - .into_iter() - .all(|o| targets.into_iter().all(|t| oo[&o].contains(&t))), - "∀ o ∈ origins: ∀ t ∈ targets: t ∈ o.outgoing" - ); - - OK - }) - }) -} diff --git a/lib/store/src/types.rs b/lib/store/src/types.rs new file mode 100644 index 0000000..9f482ca --- /dev/null +++ b/lib/store/src/types.rs @@ -0,0 +1,178 @@ +//! Defining a [`Schema`]. +//! +//! There is a lot of complicated machinery here to make it so that you have to write very little code to +//! define new types. Basically, if you want to define a thing to store, you need to implement the trait +//! for it (e.g. [`Arrow`]), and also implement [`Value`], where you create a specification describing which +//! namespaces store records of that type. +//! +//! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able +//! to operate on the types. +//! +//! [`Arrow`]: super::Arrow + +use std::collections::HashSet; + +use derive_more::Display; + +/// The namespace where all vertices must be registered. +pub(crate) const NODE_HEADERS: Keyspace = Keyspace("header:node"); + +/// The namespace where multiedge identities are mapped to endpoints. +pub(crate) const MULTIEDGE_HEADERS: Keyspace = Keyspace("header:multiedge"); + +/// A specification of all user-defined namespaces. +/// +/// # Example +/// +/// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then +/// testing whether it exists. If the appropriate keyspaces are not known to the store, this will panic. +/// +/// ```rust +/// use store::{ arrow::Arrow, types::Schema, Store, Key, OK }; +/// +/// // Each kind of value has a derive macro. +/// #[derive(Arrow)] +/// struct MyArrow { origin: Key, target: Key } +/// +/// fn main () -> store::Result<()> { +/// // Here, we make sure that the namespaces used for `MyArrow` are known. +/// let schema = Schema::new() +/// .has::(); +/// +/// let result = Store::test(schema, |db| { +/// let origin = Key::gen(); +/// let target = Key::gen(); +/// +/// let mut changes = db.batch(); +/// changes.create(MyArrow { origin, target }); +/// db.apply(changes)?; +/// +/// db.exists::(origin, target) +/// })?; +/// +/// assert!(result); +/// OK +/// } +/// ``` +/// +/// [basic arrow]: crate::arrow::Basic +pub struct Schema(pub(crate) HashSet); + +impl Schema { + /// Construct a new empty schema. + pub fn new() -> Schema { + Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS])) + } + /// Add the component to the schema. + pub fn has(mut self) -> Schema + where + C: Value, + { + self.add(C::SPEC); + self + } + /// Add a spec to the schema by mutable reference. + pub fn add(&mut self, spec: impl TypeSpec) -> &mut Schema { + spec.register(&mut self.0); + self + } +} + +/// The name of a keyspace. +/// +/// Specifically, this is the name of a RocksDB column family. +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)] +pub struct Keyspace(pub &'static str); + +impl AsRef for Keyspace { + fn as_ref(&self) -> &str { + self.0 + } +} + +/// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a +/// [mixin](MixinSpec). +/// +/// All namespaces must be unique, and added to the [`Schema`]. +pub trait Value { + type Type: TypeSpec; + const SPEC: Self::Type; +} + +/// The specification for an [`Arrow`](crate::Arrow). +/// +/// The listed namespaces must be unique among all other namespaces. +#[derive(Clone, Copy)] +pub struct ArrowSpec { + /// The keyspace where edge keys are ordered `(origin, target)`. + pub by_origin: Keyspace, + /// The keyspace where edge keys are ordered `(target, origin)`. + pub by_target: Keyspace, +} + +/// A specification for the namespaces needed to store an [`Alias`][crate::Alias]. +#[derive(Clone, Copy)] +pub struct AliasSpec { + /// The alias -> key mapping table. + pub keyspace: Keyspace, + /// The key -> alias mapping table. + pub reversed: Keyspace, +} + +/// Where do we store a mixin? +#[derive(Clone, Copy)] +pub struct MixinSpec { + /// The key -> mixin mapping table. + pub keyspace: Keyspace, +} + +/// Describes how to add a [`Value`] to a [`Schema`]. +pub trait TypeSpec { + /// Register the namespaces. + fn register(&self, set: &mut HashSet); +} + +// TODO: better error messages. + +impl TypeSpec for ArrowSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.by_origin) { + panic! { + "Duplicate found while inserting Arrow::BY_ORIGIN: {}", + self.by_origin + } + } + if !set.insert(self.by_target) { + panic! { + "Duplicate found while inserting Arrow::BY_TARGET: {}", + self.by_target + } + } + } +} +impl TypeSpec for AliasSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.keyspace) { + panic! { + "Duplicate found while inserting Alias::KEYSPACE: {}", + self.keyspace + } + } + if !set.insert(self.reversed) { + panic! { + "Duplicate found while inserting Alias::REVERSED: {}", + self.reversed + } + } + } +} +impl TypeSpec for MixinSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.keyspace) { + panic! { + "Duplicate found while inserting Mixin::KEYSPACE: {}", + self.keyspace + } + } + } +} diff --git a/lib/store/src/util.rs b/lib/store/src/util.rs index e082a37..878c56d 100644 --- a/lib/store/src/util.rs +++ b/lib/store/src/util.rs @@ -46,6 +46,28 @@ pub trait IterExt: Iterator + Sized { { self.next().ok_or(e)? } + /// `filter_map` meets `and_then`. + fn filter_bind_results<'a, I, O, E>( + self, + mut f: impl FnMut(I) -> Result, E> + 'a, + ) -> impl Iterator> + 'a + where + Self: Iterator> + 'a, + { + self.filter_map(move |r| r.and_then(|x| f(x)).transpose()) + } } impl IterExt for I where I: Iterator {} + +/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next +/// argument is the second tuple element. +pub fn key(key: K) -> impl FnOnce(V) -> (K, V) { + move |val| (key, val) +} + +/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next +/// argument is the second tuple element. +pub fn val(val: V) -> impl FnOnce(K) -> (K, V) { + move |key| (key, val) +}