From c5bd6a127ed3d5fa2cae842d666be08d86b813ff Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Tue, 23 Apr 2024 23:11:11 +0200 Subject: [PATCH] [wip] migrate to new store api --- Cargo.lock | 5 + Cargo.toml | 1 + lib/macro/Cargo.toml | 7 + lib/macro/src/lib.rs | 16 + lib/puppy/src/lib.rs | 177 ++-- lib/store/Cargo.toml | 1 + lib/store/src/alias.rs | 120 ++- lib/store/src/arrow.rs | 574 +++++++++++-- lib/store/src/internal.rs | 239 ++++++ lib/store/src/key.rs | 50 +- lib/store/src/lib.rs | 1251 +++------------------------- lib/store/src/mixin.rs | 160 +++- lib/store/src/transaction.rs | 437 ---------- lib/store/src/transaction/tests.rs | 256 ------ lib/store/src/types.rs | 171 ++++ 15 files changed, 1418 insertions(+), 2047 deletions(-) create mode 100644 lib/macro/Cargo.toml create mode 100644 lib/macro/src/lib.rs create mode 100644 lib/store/src/internal.rs delete mode 100644 lib/store/src/transaction.rs delete mode 100644 lib/store/src/transaction/tests.rs create mode 100644 lib/store/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index e145567..4004297 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,6 +885,10 @@ dependencies = [ "libc", ] +[[package]] +name = "macro" +version = "0.0.0" + [[package]] name = "matchit" version = "0.7.3" @@ -1490,6 +1494,7 @@ dependencies = [ "bincode", "chrono", "derive_more", + "macro", "rocksdb", "tempfile", "ulid", diff --git a/Cargo.toml b/Cargo.toml index eb8deed..3e1c446 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "lib/puppy", "lib/store", "lib/fetch", + "lib/macro", "bin/server", "bin/pupctl", ] diff --git a/lib/macro/Cargo.toml b/lib/macro/Cargo.toml new file mode 100644 index 0000000..c562cd5 --- /dev/null +++ b/lib/macro/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "macro" +edition = "2021" + +[lib] +path = "src/lib.rs" +proc-macro = true diff --git a/lib/macro/src/lib.rs b/lib/macro/src/lib.rs new file mode 100644 index 0000000..4d26183 --- /dev/null +++ b/lib/macro/src/lib.rs @@ -0,0 +1,16 @@ +use proc_macro::TokenStream; + +#[proc_macro_derive(Arrow, attributes(origin, target, identity))] +pub fn arrow(item: TokenStream) -> TokenStream { + TokenStream::new() +} + +#[proc_macro_derive(Alias)] +pub fn alias(item: TokenStream) -> TokenStream { + TokenStream::new() +} + +#[proc_macro_derive(Mixin)] +pub fn mixin(item: TokenStream) -> TokenStream { + TokenStream::new() +} diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index 75c3c5e..d565063 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -1,46 +1,101 @@ #![feature(iterator_try_collect)] pub use store::{self, Key, Store}; -use store::{ - alias::Username, - arrow::{self, multi::MultiArrow, AuthorOf}, - mixin::{Content, Profile}, - util::IterExt, - Keylike, Tag, -}; -mod tags { - //! Type tags for vertices. +pub mod model { + use store::{types::Schema, Key}; - use store::Tag; + #[derive(store::Mixin)] + pub struct Profile { + pub post_count: usize, + pub account_name: String, + pub display_name: Option, + pub about_string: Option, + pub about_fields: Vec<(String, String)>, + } - pub const ACTOR: Tag = Tag(0); - pub const POST: Tag = Tag(1); - pub const BITE: Tag = Tag(2); + #[derive(store::Mixin)] + pub struct Content { + pub content: Option, + pub summary: Option, + } + + #[derive(store::Arrow, Clone, Copy)] + pub struct AuthorOf { + #[origin] + pub author: Key, + #[target] + pub object: Key, + } + + #[derive(store::Arrow, Clone, Copy)] + pub struct Follows { + #[origin] + pub follower: Key, + #[target] + pub followed: Key, + } + + #[derive(store::Arrow, Clone, Copy)] + pub struct Bite { + #[identity] + pub id: Key, + #[origin] + pub biter: Key, + #[target] + pub victim: Key, + } + + #[derive(store::Arrow, Clone, Copy)] + pub struct FollowRequest { + #[identity] + pub id: Key, + pub origin: Key, + pub target: Key, + } + + #[derive(store::Alias)] + pub struct Username(pub String); + + /// Construct the schema. + pub fn schema() -> Schema { + Schema::new() + // Mixins + .has::() + .has::() + // Aliases + .has::() + // Arrows + .has::() + .has::() + .has::() + .has::() + } } pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result { let key = Key::gen(); - db.transaction(|tx| { - tx.create_vertex(key, tags::POST)?; - tx.update::(author, |_, mut profile| { + db.run(|tx| { + tx.update::(author, |mut profile| { profile.post_count += 1; - Ok(profile) + profile })?; - tx.insert(key, Content { + tx.add_mixin(key, Content { content: Some(content.to_string()), summary: None, })?; - tx.insert_arrow::((author, key))?; + tx.create(AuthorOf { + author, + object: key, + })?; Ok(key) }) } pub fn create_actor(db: &Store, username: impl ToString) -> store::Result { let key = Key::gen(); - db.transaction(|tx| { - tx.create_vertex(key, tags::ACTOR)?; - tx.insert_alias(key, Username(username.to_string()))?; - tx.insert(key, Profile { + db.run(|tx| { + tx.add_alias(key, Username(username.to_string()))?; + tx.add_mixin(key, Profile { post_count: 0, account_name: username.to_string(), display_name: None, @@ -51,43 +106,24 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result { }) } -pub fn list_posts_by_author( - db: &Store, - author: impl Keylike, -) -> store::Result> { - db.transaction(|tx| { - tx.list_outgoing::(author) - .bind_results(|(post_key, _)| tx.lookup::(post_key)) +pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result> { + db.run(|tx| { + tx.outgoing::(author) + .bind_results(|arr| tx.get_mixin::(arr.object)) .collect() }) } -pub struct Bite { - pub id: Key, - pub biter: Key, - pub victim: Key, -} - -impl MultiArrow for Bite { - const TYPE: Tag = tags::BITE; -} - pub fn bite_actor(db: &Store, biter: Key, victim: Key) -> store::Result { - db.transaction(|tx| { - // Bites are represented as multiedges. - let key = arrow::multi::insert::(&tx, biter, victim)?; - // We can treat particular arrows in a quiver as a vertex by registering it. - tx.create_vertex(key, tags::BITE)?; - Ok(key) + db.run(|tx| { + let id = Key::gen(); + tx.create(Bite { id, biter, victim })?; + Ok(id) }) } pub fn bites_on(db: &Store, victim: Key) -> store::Result> { - db.transaction(|tx| { - arrow::multi::list_incoming::(&tx, victim) - .map_ok(|(biter, id)| Bite { id, biter, victim }) - .try_collect() - }) + db.incoming::(victim).try_collect() } pub mod tl { @@ -102,13 +138,10 @@ pub mod tl { } pub fn fetch_all(db: &Store) -> Result> { - db.transaction(|tx| { + db.run(|tx| { let iter = tx.list::(); iter.bind_results(|(id, content)| { - let author = tx - .list_incoming::(id) - .keys() - .next_or(Error::Missing)?; + let author = tx.incoming::(id).next_or(Error::Missing)?; Ok(Post { id, author, @@ -123,29 +156,35 @@ pub mod tl { pub mod fr { //! Follow requests - use store::{ - arrow::{FollowRequested, Follows}, - util::IterExt as _, - Key, Store, OK, - }; + use store::{util::IterExt as _, Key, Store, OK}; - pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> { - db.transaction(|tx| { - tx.insert_arrow::((requester, target))?; - OK + use crate::model::{FollowRequest, Follows}; + + pub fn create(db: &Store, requester: Key, target: Key) -> store::Result { + db.run(|tx| { + let req = FollowRequest { + id: Key::gen(), + origin: requester, + target, + }; + tx.create(req)?; + Ok(req) }) } pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> { - db.transaction(|tx| { - tx.remove_arrow::((requester, target))?; - tx.insert_arrow::((requester, target))?; + db.run(|tx| { + tx.delete_all::(requester, target)?; + tx.create(Follows { + follower: requester, + followed: target, + })?; OK }) } pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> { - db.transaction(|tx| { + db.run(|tx| { tx.remove_arrow::((requester, target))?; OK }) diff --git a/lib/store/Cargo.toml b/lib/store/Cargo.toml index bf382b5..08cb2f6 100644 --- a/lib/store/Cargo.toml +++ b/lib/store/Cargo.toml @@ -12,3 +12,4 @@ derive_more = "*" bincode = "2.0.0-rc.3" chrono = "*" tempfile = "*" +macro = { path = "../macro" } diff --git a/lib/store/src/alias.rs b/lib/store/src/alias.rs index 44d9b98..c495821 100644 --- a/lib/store/src/alias.rs +++ b/lib/store/src/alias.rs @@ -1,17 +1,115 @@ -//! Alternative keys. +/// Derive an implementation of [`Alias`]. +pub use r#macro::Alias; -use derive_more::{Display, From}; +use super::{ + types::{AliasSpec, Value}, + Batch, Store, Transaction, +}; +use crate::{Key, Result}; -use crate::Space; +/// An alternative unique identifier for a node. +pub trait Alias: Value + From + AsRef {} -/// An alternative unique key for a vertex. -pub trait Alias: ToString + From { - const SPACE: (Space, Space); +impl Transaction<'_> { + /// Look up the key associated with the alias. + pub fn lookup(&self, alias: A) -> Result> + where + A: Alias, + { + op::lookup::(self, alias.as_ref()) + } + /// Get the alias associated with the `node`. + pub fn get_alias(&self, node: Key) -> Result> + where + A: Alias, + { + op::get_alias(self, node) + } + /// Add an alias to `node`. + pub fn add_alias(&self, node: Key, alias: A) -> Result<()> + where + A: Alias, + { + op::add_alias::(self, node, alias.as_ref()) + } + /// Check whether `node` has an `M` defined for it. + pub fn has_alias(&self, node: Key) -> Result + where + A: Alias, + { + op::has_alias::(self, node) + } } -#[derive(Display, From)] -pub struct Username(pub String); - -impl Alias for Username { - const SPACE: (Space, Space) = (Space("username/l"), Space("username/r")); +impl Store { + /// Look up the key associated with the alias. + pub fn lookup(&self, alias: A) -> Result> + where + A: Alias, + { + op::lookup::(self, alias.as_ref()) + } + /// Get the alias associated with the `node`. + pub fn get_alias(&self, node: Key) -> Result> + where + A: Alias, + { + op::get_alias(self, node) + } + /// Check whether `node` has an `M` defined for it. + pub fn has_alias(&self, node: Key) -> Result + where + A: Alias, + { + op::has_alias::(self, node) + } +} + +impl Batch { + /// Add an alias to `node`. + /// + /// # Warning + /// + /// This will *not* fail if the key already has a alias of this type, and in fact *it will cause fundamental inconsistency* + /// if the alias already exists. Don't call this function unless you know that neither `node` nor `alias` exist yet. + pub fn put_alias(&mut self, node: Key, alias: A) + where + A: Alias, + { + // TODO: consistency *could* be checked by manually iterating over the transaction using `WriteBatch::iterate` + op::add_alias::(self, node, alias.as_ref()).unwrap(); + } +} + +mod op { + use crate::{internal::*, Alias, Key, Result, OK}; + + pub fn lookup(cx: &impl Query, alias: &str) -> Result> { + cx.open(A::SPEC.keyspace).get(alias).map(|k| match k { + Some(x) => Some(Key::from_slice(x.as_ref())), + None => None, + }) + } + + pub fn has_alias(cx: &impl Query, node: Key) -> Result { + cx.open(A::SPEC.reversed).has(node) + } + + pub fn add_alias(cx: &impl Write, node: Key, alias: &str) -> Result<()> { + cx.open(A::SPEC.keyspace).set(alias, node)?; + cx.open(A::SPEC.reversed).set(node, alias)?; + OK + } + + pub fn get_alias(cx: &impl Query, node: Key) -> Result> { + let buf = cx.open(A::SPEC.reversed).get(node)?; + Ok(buf.map(decode)) + } + + fn decode(data: impl AsRef<[u8]>) -> T + where + T: From, + { + T::from(String::from_utf8_lossy(data.as_ref()).into_owned()) + } } diff --git a/lib/store/src/arrow.rs b/lib/store/src/arrow.rs index 2dfff3d..42d5bc0 100644 --- a/lib/store/src/arrow.rs +++ b/lib/store/src/arrow.rs @@ -1,77 +1,531 @@ -//! Relations between nodes. +//! Directed edges, both parallel and simple. +//! +//! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`]. +//! +//! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`], +//! and manipulating them can likewise be done from within the context of a `Transaction`. +//! +//! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a +//! few tricks to do with associated constants and types to make it all work nicely. +//! +//! # Terminology +//! +//! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes +//! can be seen as "things", and edges as connections between those things, defined by the two nodes that +//! they connect (which are called the *endpoints* of the edge). +//! +//! These edges can be directed or undirected. The difference is that undirected edges are identified by +//! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified +//! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and +//! the other is the *head* (usually called *target* here). +//! +//! # Arrow kinds +//! +//! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined +//! solely by which two nodes they connect, which means that their representation and certain operations +//! are more efficient. The trade-off is that they cannot capture more complex information than "this +//! edge exists". +//! +//! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple, +//! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they +//! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key, +//! they incur more space overhead, and most operations on them are more expensive compared to basic +//! edges. +//! +//! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which +//! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be +//! discernable from each other, each of them also has an `identity` key, in addition to listing the two +//! edges they connect. +//! +//! [mixins]: super::Mixin -use bincode::{Decode, Encode}; +pub use self::kinds::{Basic, Multi}; +use super::{ + types::{ArrowSpec, Value}, + Batch, Store, Transaction, +}; +use crate::{util::IterExt as _, Key, Result}; -use crate::Space; +/// A directed edge. +/// +/// See the [module docs][self] for an introduction. +pub trait Arrow: Value + From + Into { + /// The representation of this arrow, which also determines whether parallel edges are allowed. + type Kind: ArrowKind = Basic; +} -pub mod multi { - //! Managing multiedges. - //! - //! Unlike regular [`Arrow`]s, which don't have an identity (they are identified by the two nodes that - //! they connect), multiarrows can have their own [`Key`]. This allows one to have multiple arrows in - //! the same direction connecting the same two vertices, which isn't possible with normal arrows. - //! - //! Multiarrows can also be treated as if they were vertices, if their identity (`Key`) is registered as - //! one. - //! - //! This comes with a trade-off, though, specifically in both space and complexity. A multi-arrow also - //! can't have a label, like a typical arrow. +/// Parameterizing arrows so we can distinguish between kinds of arrows. +/// +/// This lets us present a common API for certain arrow-related operations while also leveraging some +/// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at +/// the type level and at the value level whether that arrow is a multi-arrow or not. +pub trait ArrowKind { + /// Whether this kind of arrow should be represented using the specialized representation for edges + /// that are allowed to be parallel. + const IS_MULTI: bool; + /// Construct an arrow from a buffer containing a correctly-oriented arrow. + /// + /// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently + /// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the + /// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively. + /// + /// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target. + /// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity", + /// which is needed to discriminate between multiple parallel edges. + /// + /// # Failure + /// + /// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for + /// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the + /// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory + /// safety issue, so this function is safe. + fn dec(buf: &[u8]) -> Self; + /// Encode an arrow's key origin-first and target-first. + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>); +} - use crate::{Key, Result, Tag, Transaction}; - - pub fn insert(tx: &Transaction<'_>, origin: Key, target: Key) -> Result +impl Store { + /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. + pub fn exists(&self, origin: Key, target: Key) -> Result where - A: MultiArrow, + A: Arrow, { - let key = Key::gen(); - tx.quiver(A::TYPE).insert(origin, target, key)?; - Ok(key) + op::exists::(self, origin, target) + } + /// Get all arrows of type `A` that point at `target`. + pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator> + 'a + where + A::Kind: 'a, + A: Arrow, + { + op::incoming::(self, target) + } + /// Get all arrows of type `A` that point away from `origin`. + pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator> + 'a + where + A::Kind: 'a, + A: Arrow, + { + op::outgoing::(self, origin) + } +} + +impl Transaction<'_> { + /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. + /// + /// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges). + pub fn exists(&self, origin: Key, target: Key) -> Result + where + A: Arrow, + { + op::exists::(self, origin, target) + } + /// Get all arrows of type `A` that point at `target`. + pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator> + 'a + where + A: Arrow + 'a, + { + op::incoming::(self, target).map_ok(A::from) + } + /// Get all arrows of type `A` that point away from `origin`. + pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator> + 'a + where + A: Arrow + 'a, + { + op::outgoing::(self, origin).map_ok(A::from) + } + /// Create a new arrow of type `A`. + /// + /// This operation supports both [`Multi`] and [`Basic`] arrows. + /// + /// # Example + /// + /// The following snippet creates an arrow between `origin` and `target`. + /// + /// ```rust + /// # fn main () -> store::Result<()> { + /// use store::{Arrow, Key}; + /// + /// #[derive(Arrow)] + /// struct MyArrow { origin: Key, target: Key } + /// + /// # store::new_interface::Store::test(|db| { + /// let origin = Key::gen(); + /// let target = Key::gen(); + /// + /// db.run(|tx| { + /// tx.create(MyArrow { origin, target }) + /// })?; + /// + /// assert!(db.exists::(origin, target)?); + /// # store::OK }) + /// # } + /// ``` + pub fn create(&self, arrow: A) -> Result<()> + where + A: Arrow, + { + op::create::(self, arrow.into()) + } + /// Delete all edges of type `A` from `origin` to `target`. + /// + /// It is not an error for this function not to delete anything. + pub fn delete_all(&self, origin: Key, target: Key) -> Result<()> + where + A: Arrow, + { + op::delete_all::(self, origin, target) + } + /// Delete a specific arrow. + pub fn delete_one(&self, arrow: A) -> Result<()> + where + A: Arrow, + { + op::delete_one::(self, arrow.into()) + } +} + +impl Batch { + /// Create an arrow. See [`Transaction::create`]. + pub fn create(&mut self, arrow: A) + where + A: Arrow, + { + op::create::(self, arrow.into()) + .expect("no errors expected to occur during batch operation") + } + /// Delete a specific arrow. + pub fn delete_one(&mut self, arrow: A) + where + A: Arrow, + { + op::delete_one::(self, arrow.into()) + .expect("no errors expected to occur during batch operation") + } +} + +mod op { + //! Implementations of arrow operations. + + use super::*; + use crate::{internal::*, Key, Result, OK}; + + /// Check whether there exists at least one arrow of type `A` from `origin` to `target`. + pub fn exists(cx: &impl Query, origin: Key, target: Key) -> Result + where + A: Arrow, + { + if A::Kind::IS_MULTI { + // In the case of a multi-edge, at least one result from the prefix scan + // indicates that there is at least one edge. + cx.open(A::SPEC.by_origin) + .scan(origin.fuse(target)) + .next() + .transpose() + .map(|o| o.is_some()) + } else { + cx.open(A::SPEC.by_origin).has(origin.fuse(target)) + } } - pub fn list_incoming<'db, A>( - tx: &'db Transaction<'db>, + /// List incoming arrows relative to `target`. + pub fn incoming<'db, A>( + cx: &'db impl Query, target: Key, - ) -> impl Iterator> + 'db + ) -> impl Iterator> + 'db where - A: MultiArrow, + A: Arrow, + A::Kind: 'db, { - tx.quiver(A::TYPE).list_incoming(target) + // In the `by_target` keyspace, for either kind of arrow the layout is such that the target is + // the prefix, so we pick that keyspace to more efficiently list all arrows that target the key. + cx.open(A::SPEC.by_target) + .scan(target) + .map_ok(|(mut k, _)| { + // Arrows from `by_target` are oriented target-first, while the decoder function requires + // that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix + // or the whole slice, swapping the two keys always gives us the ordering expected by the + // decoding function. + let (t, o) = k[..32].split_at_mut(16); + t.swap_with_slice(o); + A::Kind::dec(&k) + }) } - pub trait MultiArrow { - const TYPE: Tag; + /// List outgoing arrows relative to `origin`. + pub fn outgoing<'db, A>( + cx: &'db impl Query, + origin: Key, + ) -> impl Iterator> + 'db + where + A: Arrow, + A::Kind: 'db, + { + cx.open(A::SPEC.by_origin) + .scan(origin) + .map_ok(|(ref k, _)| A::Kind::dec(k)) + } + + /// Create a new arrow. + pub fn create(cx: &impl Write, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + let (by_origin, by_target) = arrow.enc(); + cx.open(A::SPEC.by_origin).set(by_origin, b"")?; + cx.open(A::SPEC.by_target).set(by_target, b"")?; + OK + } + + /// Delete all arrows from `origin` to `target`. + /// + /// TODO: Remove the query requirement (depends on range delete being available). + pub fn delete_all(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()> + where + A: Arrow, + { + let by_origin = cx.open(A::SPEC.by_origin); + let by_target = cx.open(A::SPEC.by_target); + Ok(if A::Kind::IS_MULTI { + // TODO: optimize this implementation using range deletes. + for key in by_origin.scan(origin.fuse(target)).keys() { + let key = Multi::decode(key?.as_ref()); + by_origin.del(key.encode())?; + by_target.del(key.swap().encode())?; + } + } else { + by_origin.del(origin.fuse(target))?; + by_target.del(target.fuse(origin))?; + }) + } + + /// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist. + pub fn delete_one(cx: &impl Write, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + let (by_origin, by_target) = arrow.enc(); + cx.open(A::SPEC.by_origin).del(by_origin)?; + cx.open(A::SPEC.by_target).del(by_target)?; + OK } } -/// A directed edge between two vertices. -pub trait Arrow { - type Label: Encode + Decode = (); - const SPACE: (Space, Space); -} - -/// Which way an arrow is pointing when viewed from a particular vertex. -pub enum Direction { - Incoming, - Outgoing, -} - -/// The node this arrow points away from is the "author" of the node the arrow points to. -pub struct AuthorOf; - -impl Arrow for AuthorOf { - const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r")); -} - -/// The origin of this arrow has follow requested the target. -pub struct FollowRequested; - -impl Arrow for FollowRequested { - const SPACE: (Space, Space) = (Space("pending-fr/l"), Space("pending-fr/r")); -} - -/// The origin "follows" the target. -pub struct Follows; - -impl Arrow for Follows { - const SPACE: (Space, Space) = (Space("follows/l"), Space("follows/r")); +/// Types representing the different kinds of arrows. +mod kinds { + use super::ArrowKind; + use crate::Key; + + impl ArrowKind for Multi { + const IS_MULTI: bool = true; + fn dec(buf: &[u8]) -> Self { + Multi::decode(buf) + } + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { + (self.encode(), self.swap().encode()) + } + } + + impl ArrowKind for Basic { + const IS_MULTI: bool = false; + fn dec(buf: &[u8]) -> Self { + Basic::decode(buf) + } + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { + (self.encode(), self.reverse().encode()) + } + } + + /// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist + /// between two vertices. + #[derive(Clone, Copy)] + pub struct Multi { + /// The node that this edge points away from. + pub origin: Key, + /// The node that this edge points towards. + pub target: Key, + /// The discriminator of this particular edge, which distinguishes it from all other edges that + /// connect `origin` and `target`, and indeed from every other edge or node in the graph. + pub identity: Key, + } + + impl Multi { + /// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly + /// oriented, the results will be wrong; the arrow will be oriented *away* from the target and + /// *at* the origin, instead of the other way around. + /// + /// # Orientation + /// + /// In this context, *correctly oriented* means that it is laid out in *origin-first* order, + /// like this (where `o`, `t` and `i` represent bytes): + /// + /// ```text + /// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii + /// |--------------| |--------------| |--------------| + /// origin target identity + /// ..16 16..32 32.. + /// ``` + /// + /// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is + /// the prefix, followed by the origin, and then the identity. This is also called *target-first* + /// encoding in this documentation. + /// + /// # Silent failure + /// + /// There is no way to detect whether the ordering is correct from just the buffer, so the caller + /// must ensure that the order is correct. If you have a target-first encoded buffer, you can have + /// to swap the two keys before passing it into this function, or this function will give you an + /// edge that does not exist (since a multiedge can only point in one direction). + /// + /// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`. + /// + /// # Panics + /// + /// This function panics if `buf` is not exactly 48 bytes long. + pub fn decode(buf: &[u8]) -> Multi { + Multi { + origin: Key::from_slice(&buf[..16]), + target: Key::from_slice(&buf[16..32]), + identity: Key::from_slice(&buf[32..]), + } + } + /// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation + /// of the difference between origin-first encoding and target-first encoding. + pub fn encode(self) -> [u8; 48] { + let mut key = [0; 48]; + key[..16].copy_from_slice(&self.origin.0); + key[16..32].copy_from_slice(&self.target.0); + key[32..].copy_from_slice(&self.identity.0); + key + } + /// Swap the origin and target of this arrow, while leaving the identity the same. + pub(super) fn swap(self) -> Multi { + Multi { + origin: self.target, + target: self.origin, + ..self + } + } + } + + /// A normal directed edge. Duplicates are not allowed. + /// + /// This kind of arrow is useful for modeling predicates and simple relationships. + #[derive(Clone, Copy)] + pub struct Basic { + pub origin: Key, + pub target: Key, + } + + impl Basic { + /// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the + /// other direction). + pub fn reverse(self) -> Basic { + Basic { + origin: self.target, + target: self.origin, + } + } + /// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering. + pub fn encode(self) -> [u8; 32] { + self.origin.fuse(self.target) + } + /// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information + /// about key encoding. + /// + /// # Panics + /// + /// Panics if `buf` is not exactly 32 bytes long. + pub fn decode(buf: &[u8]) -> Basic { + let (origin, target) = Key::split(buf); + Basic { origin, target } + } + } } +/// Derive [`Arrow`] for a struct. +/// +/// This will generate the required [`Into`] and [`From`] impls, as well as an [`Arrow`](trait@Arrow) impl and +/// a [`Value`] impl with the namespaces derived from the name of the struct. The macro works on structs with +/// specific fields, or newtypes of any arrow kind. +/// +/// # Attributes +/// +/// The `origin`, `target` and `identity` attributes are used on fields of type [`Key`], and they are used +/// to map the arrow's type to an [`ArrowKind`]. The `#[origin]` annotation isn't needed if the struct contains +/// a field named `origin`. Ditto with `target` and `identity`. +/// +/// If there is no `identity` defined, the `ArrowKind` will be [`Basic`]. If an `identity` is defined, the kind +/// will be [`Multi`]. +/// +/// # Examples +/// +/// Generates a [`Basic`] arrow called `my-arrow`. +/// +/// ``` +/// use store::{Key, Arrow, types::Schema}; +/// +/// #[derive(Arrow)] +/// struct MyArrow { origin: Key, target: Key } +/// +/// // This will fail to compile if the type doesn't implement `Arrow` correctly +/// Schema::new().has::(); +/// ``` +/// +/// Newtypes of either arrow kind are supported. +/// +/// ``` +/// use store::arrow::{Basic, Multi, Arrow}; +/// +/// /// The origin has requested to follow the target. +/// /// +/// /// Note: there may be more than one follow request between any two actors. +/// #[derive(Arrow)] +/// struct FollowRequest(Multi); +/// +/// /// A relation between two actors meaning that the origin follows the target. +/// #[derive(Arrow)] +/// struct Follows(Basic); +/// +/// /// Users can follow each other. +/// struct User(Key); +/// +/// impl User { +/// /// Make `self` follow `other`. +/// pub fn follows(self, other: User) -> Follows { +/// Follows(Basic { origin: self.0, target: other.0 }) +/// } +/// } +/// ``` +/// +/// Generates a [`Multi`] arrow called `my-multi-arrow`, mapping the multiarrow's discriminator to the struct's +/// `unique` field. +/// +/// ``` +/// use store::{Key, Arrow}; +/// +/// #[derive(Arrow)] +/// struct MyMultiArrow { +/// pub origin: Key, +/// pub target: Key, +/// #[identity] +/// pub unique: Key, +/// } +/// ``` +/// +/// The macro automatically adds `From` and `Into` implementations: +/// +/// ``` +/// use store::{Key, Arrow, arrow::Basic}; +/// +/// #[derive(Arrow)] +/// struct MyArrow { origin: Key, target: Key } +/// +/// let origin = Key::gen(); +/// let target = Key::gen(); +/// +/// let edge: Basic = MyArrow { origin, target }.into(); +/// +/// assert_eq!(origin, edge.origin); +/// assert_eq!(target, edge.target); +/// ``` +pub use r#macro::Arrow; diff --git a/lib/store/src/internal.rs b/lib/store/src/internal.rs new file mode 100644 index 0000000..8d562e2 --- /dev/null +++ b/lib/store/src/internal.rs @@ -0,0 +1,239 @@ +//! Provides a nice hashmap-esque interface for manipulating entries in the store's backend. + +use std::sync::Arc; + +use rocksdb::{BoundColumnFamily, IteratorMode}; + +pub use self::cx::{Context, Query, Write}; +use crate::{util::IterExt as _, Error, Result}; + +/// An internal interface to a specific keyspace that exposes basic hashmap-esque operations +/// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a +/// [`Store`]. +pub struct Keyspace<'db, C> { + pub(super) context: &'db C, + pub(super) cf: Arc>, +} + +impl<'db, C> Keyspace<'db, C> +where + C: Query, +{ + /// Fetch a value from the keyspace. + pub fn get(&self, key: impl AsRef<[u8]>) -> Result + 'db>> { + self.context.get_pinned(&self.cf, key) + } + /// Test whether a key exists. + pub fn has(&self, key: impl AsRef<[u8]>) -> Result { + self.get(key).map(|r| r.is_some()) + } + /// Execute a prefix scan. + pub fn scan( + &self, + prefix: impl AsRef<[u8]> + 'db, + ) -> impl Iterator, Box<[u8]>)>> + 'db { + let t = prefix.as_ref().to_vec(); + self.context + .prefix_iterator(&self.cf, prefix.as_ref()) + // The prefix iterator may "overshoot". This makes it stop when it reaches + // the end of the range that has the prefix. + .take_while(move |r| match r { + Ok((ref k, _)) => k.starts_with(&t), + _ => true, + }) + .map_err(Error::Internal) + } + /// List all pairs in the keyspace. + pub fn list(&self) -> impl Iterator, Box<[u8]>)>> + 'db { + self.context + .full_iterator(&self.cf, IteratorMode::Start) + .map_err(Error::Internal) + } +} + +impl Keyspace<'_, C> +where + C: Write, +{ + /// Set the given `key` to the `value`, overwriting it if there was already a value there. + pub fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { + self.context.put(&self.cf, key, val) + } + /// Drop the value if it exists. + pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> { + self.context.delete(&self.cf, key) + } +} + +mod cx { + //! Contexts for doing reads, writes or both to the database. + //! + //! The traits in this module map abstract calls to their methods on the [rocksdb] objects. + + use rocksdb::{ + AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode, + }; + + use super::Keyspace; + use crate::{Backend, Batch, Error, Result, Store, Transaction, OK}; + + /// A context for executing database operations. + pub trait Context { + /// Open the keyspace identified by `cf`. + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> + where + Self: Sized; + } + + /// A context in which one can read from the data store. + /// + /// Specifically, this maps calls to either the transaction or the store's internals without us having + /// to implement methods for *both* transactions and the store. + pub trait Query: Context { + type Backend: DBAccess; + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>>; + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + } + + /// A context in which one can read from and modify the data store. + pub trait Write: Context { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>; + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()>; + } + + impl Context for Store { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { + Keyspace { + cf: self.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl Query for Store { + type Backend = Backend; + + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>> { + self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) + } + + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Backend> { + self.inner.prefix_iterator_cf(cf, prefix) + } + + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.full_iterator_cf(cf, mode) + } + } + + impl Context for Transaction<'_> { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { + Keyspace { + cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl<'db> Query for Transaction<'db> { + type Backend = rocksdb::Transaction<'db, Backend>; + + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>> { + self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) + } + + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.prefix_iterator_cf(cf, prefix) + } + + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.full_iterator_cf(cf, mode) + } + } + + impl Write for Transaction<'_> { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { + self.inner.delete_cf(cf, key).map_err(Error::Internal) + } + + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()> { + self.inner.put_cf(cf, key, val).map_err(Error::Internal) + } + } + + impl Context for Batch { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> + where + Self: Sized, + { + Keyspace { + cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl Write for Batch { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { + self.inner.borrow_mut().delete_cf(cf, key); + OK + } + + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()> { + self.inner.borrow_mut().put_cf(cf, key, val); + OK + } + } +} diff --git a/lib/store/src/key.rs b/lib/store/src/key.rs index 7036ee7..7cccb51 100644 --- a/lib/store/src/key.rs +++ b/lib/store/src/key.rs @@ -3,8 +3,6 @@ use std::fmt::{Debug, Display}; use chrono::{DateTime, Utc}; use ulid::Ulid; -use crate::{Alias, Error, Result, Transaction}; - /// A unique identifier for vertices in the database. #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Key(pub(crate) [u8; 16]); @@ -31,7 +29,7 @@ impl Key { Key(key) } pub fn timestamp(self) -> DateTime { - let ms = Ulid::from_bytes(self.0).timestamp_ms(); + let ms = self.to_ulid().timestamp_ms(); DateTime::from_timestamp_millis(ms as i64).unwrap() } /// Join two keys together. @@ -46,6 +44,9 @@ impl Key { let head = Key::from_slice(&buf[16..]); (tail, head) } + fn to_ulid(self) -> Ulid { + Ulid::from_bytes(self.0) + } } impl AsRef<[u8]> for Key { @@ -53,46 +54,3 @@ impl AsRef<[u8]> for Key { &self.0 } } - -/// Anything that can be used to reference a vertex, both "normal" [keys](Key) -/// and [aliases](Alias). -/// -/// In general, using a key directly is going to be more efficient than using -/// an alias, because it incurs less lookups. -pub trait Keylike: Sized { - /// Translate the thing to a [`Key`]. - /// - /// This function should return [`Error::Missing`] if the key cannot be located. - fn translate(self, tx: &Transaction<'_>) -> Result; - /// Translate, and check whether the key is actually registered. - /// - /// This function should return [`Error::Undefined`] if the key does not *really* - /// exist. It should return [`Error::Missing`] if the key can't be found. - fn checked_translate(self, tx: &Transaction<'_>) -> Result { - let key = self.translate(tx)?; - if !tx.is_registered(key)? { - Err(Error::Undefined { key }) - } else { - Ok(key) - } - } -} - -impl Keylike for Key { - fn translate(self, _: &Transaction<'_>) -> Result { - Ok(self) - } -} - -impl Keylike for A -where - A: Alias, -{ - fn translate(self, tx: &Transaction<'_>) -> Result { - tx.lookup_alias(self) - } -} - -/// A type tag identifying a vertex. -#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)] -pub struct Tag(pub u8); diff --git a/lib/store/src/lib.rs b/lib/store/src/lib.rs index b68ee12..004dd56 100644 --- a/lib/store/src/lib.rs +++ b/lib/store/src/lib.rs @@ -1,89 +1,114 @@ -#![feature( - iterator_try_collect, - associated_type_defaults, - // All needed to make the compile time string concatenation bullshit work - generic_const_exprs, - const_intrinsic_copy, - const_mut_refs, - generic_arg_infer, - str_from_raw_parts, - core_intrinsics, // "Using it is strongly discouraged" ok but it works tho - const_heap -)] -//! The data store abstractions used by the ActivityPuppy project. +#![feature(iterator_try_collect, associated_type_defaults)] +#![feature(marker_trait_attr)] +//! Data persistence for the ActivityPuppy social media server built on top of [rocksdb]. //! -//! Persistence in a puppy server is handled by this component, which implements a directed graph -//! inspired datastore on top of the [rocksdb] key-value store. +//! # Overview //! -//! The workflow for manipulating stuff in the store is to open a [`Store`], and then to call -//! its [`transaction`](Store::transaction) method. This method takes a function that, given -//! a [`Transaction`], returns a result with some value. The `Transaction` object contains some -//! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it -//! back. +//! The design of the data store's abstractions is heavily inspired by graph theory. The idea is to encourage +//! composition and loose coupling by segmenting all data associated with a node into [mixins][Mixin], and +//! modeling relations and predicates between nodes as [arrows][Arrow]. In additions, the key identifying a +//! node can be [aliased][Alias] by a string newtype, which must be unique within the namespace of that alias. //! -//! This component is specialized to puppy's storage needs, and probably won't be much use unless -//! you're writing something that interfaces with puppy. +//! The API is optimized for reducing boilerplate and legibility at the call site. +//! +//! There are three interfaces to the store: the read-only [`Store`], the write-only [`Batch`] and the [`Transaction`], +//! which allows both reads and writes. -use std::{path::Path, sync::Arc}; +use std::{cell::RefCell, path::Path, sync::Arc}; use derive_more::From; -use rocksdb::{MultiThreaded, Options, TransactionDBOptions}; - -type Backend = rocksdb::TransactionDB; +use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction}; +use types::Schema; +mod alias; +mod internal; mod key; -mod transaction; - -pub use key::{Key, Keylike, Tag}; -pub use transaction::Transaction; -pub use {alias::Alias, arrow::Arrow, mixin::Mixin}; - -pub mod alias; -pub mod arrow; -pub mod mixin; -pub mod util; - -/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly). -pub const OK: Result<()> = Ok(()); - -/// Master list of all column family names in use. -const SPACES: &[&'static str] = &[ - "registry", - "username/l", - "username/r", - "follows/l", - "follows/r", - "profile", - "content", - "created-by/l", - "created-by/r", - "pending-fr/l", - "pending-fr/r", - "multi:id-map", - "multi:index/l", - "multi:index/r", - #[cfg(test)] - "test-arrow/l", - #[cfg(test)] - "test-arrow/r", -]; - -/// The handle to the data store. -/// -/// This type can be cloned freely. -#[derive(Clone)] -pub struct Store { - inner: Arc, -} +mod mixin; /// The name of the puppy data store inside the state directory. const STORE_NAME: &str = "main-store"; +/// Open a [`Store`]. Creates one if it doesn't exist yet at the state directory path. +pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { + Store::open(state_dir, schema) +} + +pub use {alias::Alias, arrow::Arrow, key::Key, mixin::Mixin}; + +pub mod arrow; +pub mod types; +pub mod util; + +/// The main interface to the data persistence engine. +/// +/// This type provides reading capabilities, but does not expose APIs for manipulating data in the store. For +/// that, you must [run][Store::run] a [`Transaction`] or [apply][Store::apply] a [`Batch`]. +#[derive(Clone)] +pub struct Store { + // TODO: maybe switch to `OptimisticTransactionDB` because it has `batched_multi_get_cf`, which may be useful + // if we end up doing lots of point lookups. alternatively, maybe we don't need *transactions* altogether, and + // we can get away with write batches and snapshots. the main problem with transactions is that it doesn't let + // us do range deletes, which affects the efficiency of multiarrow deletion. + // + // a switch to write batches is feasible if we end up not doing reads and writes in the same transaction. + inner: Arc, +} + +/// Hosts APIs for manipulating the data store. +/// +/// You can access these APIs from the body of the closure passed to [`Store::run`]. +pub struct Transaction<'db> { + inner: rocksdb::Transaction<'db, Backend>, + store: &'db Store, +} + +/// A set of writes that are to be executed atomically. +pub struct Batch { + inner: RefCell>, + store: Store, +} + impl Store { - /// Open a data store in the given `state_dir`. + /// Run a [transaction][Transaction]. /// - /// If the data store does not exist yet, it will be created. - pub fn open(state_dir: impl AsRef) -> Result { + /// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not + /// recorded. Changes made inside a transaction can be read from within that transaction before they are + /// committed. + /// + /// If the closure passed to `run` returns an error, the transaction is rolled back, and otherwise the + /// changes are committed. + pub fn run(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result + where + E: From, + { + let tx = Transaction { + inner: self.inner.transaction(), + store: &self, + }; + let r = f(&tx); + if let Err(e) = if r.is_err() { + tx.inner.rollback() + } else { + tx.inner.commit() + } { + return Err(E::from(Error::Internal(e))); + } + r + } + /// Apply a batch of changes atomically. + pub fn apply(&self, batch: Batch) -> Result<()> { + self.inner.write(batch.inner.into_inner())?; + OK + } + /// Construct a [`Batch`]. + pub fn batch(&self) -> Batch { + Batch { + inner: RefCell::new(WriteBatchWithTransaction::default()), + store: self.clone(), + } + } + /// Open the data store in `state_dir`, and create one if it doesn't exist yet. + pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { let mut db_opts = Options::default(); db_opts.create_if_missing(true); db_opts.create_missing_column_families(true); @@ -92,57 +117,24 @@ impl Store { &db_opts, &tx_opts, state_dir.as_ref().join(STORE_NAME), - SPACES, + schema.0, )?); Ok(Store { inner }) } - /// Construct a temporary store, for testing. This store gets erased after `f` is done. - pub fn with_tmp(f: impl FnOnce(Store) -> Result) -> Result - where - E: From, - { - let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir"); - f(Store::open(tmp_dir)?) - } - /// Delete the whole store. - /// - /// **This deletes all data in the store**. Do not run this unless you want to delete all the state of the instance. + /// Delete the main data store in `state_dir` if it exists. pub fn nuke(state_dir: impl AsRef) -> Result<()> { Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME)) - .map_err(Error::from) + .map_err(Error::Internal) } - /// Get the value of mixin `M` for `key`. - pub fn lookup(&self, key: impl Keylike) -> Result<(Key, M)> - where - M: Mixin, - { - self.transaction(|tx| tx.lookup(key)) - } - /// Get the key associated with a given [alias][Alias]. - pub fn translate(&self, s: impl ToString) -> Result - where - A: Alias, - { - self.transaction(|tx| tx.lookup_alias(A::from(s.to_string()))) - } - /// Quickly test whether a particular [arrow][Arrow] exists. - pub fn exists(&self, arrow: (Key, Key)) -> Result - where - A: Arrow, - { - self.transaction(|tx| tx.exists::(arrow)) + /// Open a store that lives until `f` returns, for testing. + pub fn test(schema: Schema, f: impl FnOnce(Store) -> T) -> T { + let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir"); + f(Store::open(tmp_dir, schema).expect("failed to open temporary data store in {tmp_dir}")) } } -/// An isolated keyspace. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct Space(&'static str); - -impl AsRef for Space { - fn as_ref(&self) -> &str { - &self.0 - } -} +/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly). +pub const OK: Result<()> = Ok(()); /// Results from this component. pub type Result = std::result::Result; @@ -166,1033 +158,4 @@ pub enum Error { Decoding(bincode::error::DecodeError), } -pub mod new_interface { - //! Data persistence for the ActivityPuppy social media server. - //! - //! # Overview - //! - //! The design of the data store's abstractions is heavily inspired by graph theory. The idea is to encourage - //! composition and loose coupling by segmenting all data associated with a node into [mixins][Mixin], and - //! modeling relations and predicates between nodes as [arrows][Arrow]. In additions, the key identifying a - //! node can be [aliased][Alias] by a string newtype, which must be unique within the namespace of that alias. - //! - //! The read-only operations on the `Store` type will have abridged documentation compared to the docs on their - //! counterparts defined on the `Transaction` type. - - use std::{cell::RefCell, path::Path, sync::Arc}; - - use rocksdb::{ - BoundColumnFamily, IteratorMode, Options, TransactionDBOptions, WriteBatchWithTransaction, - }; - - use crate::{util::IterExt as _, Backend, Error, Result, STORE_NAME}; - - /// The main interface to the data persistence engine. - /// - /// This type provides reading capabilities, but does not expose APIs for manipulating data in the store. For - /// that, you must [run][Store::run] a [`Transaction`] or [apply][Store::apply] a [`Batch`]. - #[derive(Clone)] - pub struct Store { - // TODO: maybe switch to `OptimisticTransactionDB` because it has `batched_multi_get_cf`, which may be useful - // if we end up doing lots of point lookups. alternatively, maybe we don't need *transactions* altogether, and - // we can get away with write batches and snapshots. the main problem with transactions is that it doesn't let - // us do range deletes, which affects the efficiency of multiarrow deletion. - // - // a switch to write batches is feasible if we end up not doing reads and writes in the same transaction. - inner: Arc, - } - - impl Store { - /// Run a [transaction][Transaction]. - /// - /// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not - /// recorded. Changes made inside a transaction can be read from within that transaction before they are - /// committed. - /// - /// If the closure passed to `run` returns an error, the transaction is rolled back, and otherwise the - /// changes are committed. - pub fn run(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result - where - E: From, - { - let tx = Transaction { - inner: self.inner.transaction(), - store: &self, - }; - let r = f(&tx); - if let Err(e) = if r.is_err() { - tx.inner.rollback() - } else { - tx.inner.commit() - } { - return Err(E::from(Error::Internal(e))); - } - r - } - /// Construct and apply a batch of changes atomically. - pub fn apply(&self, f: impl FnOnce(&mut Batch)) -> Result<()> { - let mut batch = Batch { - inner: RefCell::new(WriteBatchWithTransaction::default()), - store: self.clone(), - }; - f(&mut batch); - self.inner.write(batch.inner.into_inner())?; - crate::OK - } - /// Open the data store in `state_dir`, and create one if it doesn't exist yet. - pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { - let mut db_opts = Options::default(); - db_opts.create_if_missing(true); - db_opts.create_missing_column_families(true); - let tx_opts = TransactionDBOptions::default(); - let inner = Arc::new(Backend::open_cf( - &db_opts, - &tx_opts, - state_dir.as_ref().join(STORE_NAME), - schema.0, - )?); - Ok(Store { inner }) - } - /// Delete the main data store in `state_dir` if it exists. - pub fn nuke(state_dir: impl AsRef) -> Result<()> { - Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME)) - .map_err(Error::Internal) - } - /// Open a store that lives until `f` returns, for testing. - pub fn test(schema: Schema, f: impl FnOnce(Store) -> T) -> T { - let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir"); - f(Store::open(tmp_dir, schema) - .expect("failed to open temporary data store in {tmp_dir}")) - } - } - - /// Open a [`Store`]. Creates one if it doesn't exist yet at the state directory path. - pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { - Store::open(state_dir, schema) - } - - pub use {alias::Alias, arrow::Arrow, mixin::Mixin}; - - use self::{ - cx::{Query, Write}, - types::Schema, - }; - - /// Hosts APIs for manipulating the data store. - /// - /// You can access these APIs from the body of the closure passed to [`Store::run`]. - pub struct Transaction<'db> { - inner: rocksdb::Transaction<'db, Backend>, - store: &'db Store, - } - - /// A set of writes that are to be executed atomically. - pub struct Batch { - inner: RefCell>, - store: Store, - } - - pub mod types { - //! Defining a [`Schema`]. - //! - //! There is a lot of complicated machinery here to make it so that you have to write very little code to - //! define new types. Basically, if you want to define a thing to store, you need to implement the trait - //! for it (e.g. [`Arrow`]), and also implement [`RecordType`], where you create a specification describing - //! which namespaces store records of that type. - //! - //! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able - //! to operate on the types. - //! - //! [`Arrow`]: super::Arrow - - // IDEA: Maybe this could be type-level :3 so that each store just has a type parameter for this. - - use std::{collections::HashSet, mem::ManuallyDrop}; - - use derive_more::Display; - - /// The namespace where all vertices must be registered. - pub(crate) const NODE_HEADERS: Namespace = Namespace("header:node"); - - /// The namespace where multiedge identities are mapped to endpoints. - pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); - - /// A specification of all user-defined namespaces. - pub struct Schema(pub(crate) HashSet); - - impl Schema { - /// Construct a new empty schema. - pub fn new() -> Schema { - Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS])) - } - /// Add the component to the schema. - pub fn add(mut self) -> Schema - where - C: RecordType, - { - self.add_mut(C::SPEC); - self - } - /// Add a spec to the schema by mutable reference. - pub fn add_mut(&mut self, spec: impl TypeSpec) -> &mut Schema { - spec.register(&mut self.0); - self - } - } - - /// The name of a keyspace. - /// - /// Specifically, this is the name of a RocksDB column family. - #[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)] - pub struct Namespace(&'static str); - - impl AsRef for Namespace { - fn as_ref(&self) -> &str { - self.0 - } - } - - /// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a - /// [mixin](MixinSpec). - pub trait RecordType { - type Spec: TypeSpec; - const SPEC: Self::Spec; - const NAME: &'static str; - } - - /// The specification for an [`Arrow`](super::Arrow). - /// - /// The listed namespaces must be unique among all other namespaces. - #[derive(Clone, Copy)] - pub struct ArrowSpec { - /// The keyspace where edge keys are ordered `(origin, target)`. - pub(crate) by_origin: Namespace, - /// The keyspace where edge keys are ordered `(target, origin)`. - pub(crate) by_target: Namespace, - } - - impl ArrowSpec { - /// Generate a spec based on the NAME. - pub const fn make() -> ArrowSpec - where - T: RecordType, - [(); T::NAME.len() + 12]: Sized, - { - use std::intrinsics::const_allocate; - // TODO: clean this up - #[allow(type_alias_bounds)] - type Arr = [u8; X::NAME.len() + 12]; - let tails = unsafe { - let start = const_allocate( - std::mem::size_of::>(), - std::mem::align_of::>(), - ); - - std::ptr::copy(b"arrow:".as_ptr(), start, 6); - - let ptr = start.add(6); - std::ptr::copy(T::NAME.as_ptr(), ptr, T::NAME.len()); - - let ptr = ptr.add(T::NAME.len()); - std::ptr::copy(b"/tails".as_ptr(), ptr, 6); - - std::str::from_raw_parts(start, T::NAME.len() + 12) - }; - let heads = unsafe { - let start = const_allocate( - std::mem::size_of::>(), - std::mem::align_of::>(), - ); - - std::ptr::copy(b"arrow:".as_ptr(), start, 6); - - let ptr = start.add(6); - std::ptr::copy(T::NAME.as_ptr(), ptr, T::NAME.len()); - - let ptr = ptr.add(T::NAME.len()); - std::ptr::copy(b"/heads".as_ptr(), ptr, 6); - - std::str::from_raw_parts(start, T::NAME.len() + 12) - }; - ArrowSpec { - by_origin: Namespace(tails), - by_target: Namespace(heads), - } - } - } - - #[derive(Clone, Copy)] - pub struct AliasSpec { - pub keyspace: Namespace, - pub reversed: Namespace, - } - - #[derive(Clone, Copy)] - pub struct MixinSpec { - pub keyspace: Namespace, - } - - /// Describes how to add a [`RecordType`] to a [`Schema`]. - pub trait TypeSpec { - /// Register the namespaces. - fn register(&self, set: &mut HashSet); - } - - impl TypeSpec for ArrowSpec { - fn register(&self, set: &mut HashSet) { - if !set.insert(self.by_origin) { - panic! { - "Duplicate found while inserting Arrow::BY_ORIGIN: {}", - self.by_origin - } - } - if !set.insert(self.by_target) { - panic! { - "Duplicate found while inserting Arrow::BY_TARGET: {}", - self.by_target - } - } - } - } - impl TypeSpec for AliasSpec { - fn register(&self, set: &mut HashSet) { - if !set.insert(self.keyspace) { - panic! { - "Duplicate found while inserting Alias::KEYSPACE: {}", - self.keyspace - } - } - if !set.insert(self.reversed) { - panic! { - "Duplicate found while inserting Alias::REVERSED: {}", - self.reversed - } - } - } - } - impl TypeSpec for MixinSpec { - fn register(&self, set: &mut HashSet) { - if !set.insert(self.keyspace) { - panic! { - "Duplicate found while inserting Mixin::KEYSPACE: {}", - self.keyspace - } - } - } - } - - impl Schema {} - } - - pub mod arrow { - //! Directed edges, both parallel and simple. - //! - //! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`]. - //! - //! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`], - //! and manipulating them can likewise be done from within the context of a `Transaction`. - //! - //! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a - //! few tricks to do with associated constants and types to make it all work nicely. - //! - //! # Terminology - //! - //! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes - //! can be seen as "things", and edges as connections between those things, defined by the two nodes that - //! they connect (which are called the *endpoints* of the edge). - //! - //! These edges can be directed or undirected. The difference is that undirected edges are identified by - //! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified - //! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and - //! the other is the *head* (usually called *target* here). - //! - //! # Arrow kinds - //! - //! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined - //! solely by which two nodes they connect, which means that their representation and certain operations - //! are more efficient. The trade-off is that they cannot capture more complex information than "this - //! edge exists". - //! - //! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple, - //! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they - //! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key, - //! they incur more space overhead, and most operations on them are more expensive compared to basic - //! edges. - //! - //! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which - //! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be - //! discernable from each other, each of them also has an `identity` key, in addition to listing the two - //! edges they connect. - //! - //! [mixins]: super::Mixin - - pub use self::kinds::{Basic, Multi}; - use super::{ - types::{ArrowSpec, RecordType}, - Batch, Store, Transaction, - }; - use crate::{Key, Result}; - - /// A directed edge. - /// - /// See the [module docs][self] for an introduction. - pub trait Arrow: RecordType { - /// The representation of this arrow, which also determines whether parallel edges are allowed. - type Kind: ArrowKind = Basic; - } - - /// Parameterizing arrows so we can distinguish between kinds of arrows. - /// - /// This lets us present a common API for certain arrow-related operations while also leveraging some - /// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at - /// the type level and at the value level whether that arrow is a multi-arrow or not. - pub trait ArrowKind { - /// Whether this kind of arrow should be represented using the specialized representation for edges - /// that are allowed to be parallel. - const IS_MULTI: bool; - /// Construct an arrow from a buffer containing a correctly-oriented arrow. - /// - /// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently - /// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the - /// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively. - /// - /// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target. - /// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity", - /// which is needed to discriminate between multiple parallel edges. - /// - /// # Failure - /// - /// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for - /// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the - /// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory - /// safety issue, so this function is safe. - fn dec(buf: &[u8]) -> Self; - /// Encode an arrow's key origin-first and target-first. - fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>); - } - - impl Store { - /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. - pub fn exists(&self, origin: Key, target: Key) -> Result - where - A: Arrow, - { - op::exists::(self, origin, target) - } - /// Get all arrows of type `A` that point at `target`. - pub fn incoming<'a, A>( - &'a self, - target: Key, - ) -> impl Iterator> + 'a - where - A::Kind: 'a, - A: Arrow, - { - op::incoming::(self, target) - } - /// Get all arrows of type `A` that point away from `origin`. - pub fn outgoing<'a, A>( - &'a self, - origin: Key, - ) -> impl Iterator> + 'a - where - A::Kind: 'a, - A: Arrow, - { - op::outgoing::(self, origin) - } - } - - impl Transaction<'_> { - /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. - /// - /// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges). - pub fn exists(&self, origin: Key, target: Key) -> Result - where - A: Arrow, - { - op::exists::(self, origin, target) - } - /// Get all arrows of type `A` that point at `target`. - pub fn incoming<'a, A>( - &'a self, - target: Key, - ) -> impl Iterator> + 'a - where - A::Kind: 'a, - A: Arrow, - { - op::incoming::(self, target) - } - /// Get all arrows of type `A` that point away from `origin`. - pub fn outgoing<'a, A>( - &'a self, - origin: Key, - ) -> impl Iterator> + 'a - where - A::Kind: 'a, - A: Arrow, - { - op::outgoing::(self, origin) - } - /// Create a new arrow of type `A`. - /// - /// This operation supports both [`Multi`] and [`Basic`] arrows. - /// - /// ```rust - /// # fn main () -> store::Result<()> { - /// use store::{new_interface::arrow::{Arrow, Multi}, Key}; - /// - /// enum MyMultiArrow {} - /// - /// impl Arrow for MyMultiArrow { - /// type Kind = Multi; - /// // ... - /// } - /// - /// # store::new_interface::Store::test(|db| { - /// let origin = Key::gen(); - /// let target = Key::gen(); - /// - /// db.run(|tx| { - /// tx.create::(Multi { - /// identity: Key::gen(), - /// origin, - /// target, - /// }) - /// })?; - /// - /// assert!(db.exists::(origin, target)?); - /// # store::OK }) - /// # } - /// ``` - pub fn create(&self, arrow: A::Kind) -> Result<()> - where - A: Arrow, - { - op::create::(self, arrow) - } - /// Delete all edges of type `A` from `origin` to `target`. - /// - /// It is not an error for this function not to delete anything. - pub fn delete_all(&self, origin: Key, target: Key) -> Result<()> - where - A: Arrow, - { - op::delete_all::(self, origin, target) - } - /// Delete a specific arrow. - pub fn delete_one(&self, arrow: A::Kind) -> Result<()> - where - A: Arrow, - { - op::delete_one::(self, arrow) - } - } - - impl Batch { - /// Create an arrow. See [`Transaction::create`]. - pub fn create(&mut self, arrow: A::Kind) - where - A: Arrow, - { - op::create::(self, arrow) - .expect("no errors expected to occur during batch operation") - } - /// Delete a specific arrow. - pub fn delete_one(&mut self, arrow: A::Kind) - where - A: Arrow, - { - op::delete_one::(self, arrow) - .expect("no errors expected to occur during batch operation") - } - } - - mod op { - //! Implementations of arrow operations. - - use super::{ - super::{Query, Write}, - *, - }; - use crate::{util::IterExt as _, Key, Result, OK}; - - /// Check whether there exists at least one arrow of type `A` from `origin` to `target`. - pub fn exists(cx: &impl Query, origin: Key, target: Key) -> Result - where - A: Arrow, - { - if A::Kind::IS_MULTI { - // In the case of a multi-edge, at least one result from the prefix scan - // indicates that there is at least one edge. - cx.open(A::SPEC.by_origin) - .scan(origin.fuse(target)) - .next() - .transpose() - .map(|o| o.is_some()) - } else { - cx.open(A::SPEC.by_origin).has(origin.fuse(target)) - } - } - - /// List incoming arrows relative to `target`. - pub fn incoming<'db, A>( - cx: &'db impl Query, - target: Key, - ) -> impl Iterator> + 'db - where - A: Arrow, - A::Kind: 'db, - { - // In the `by_target` keyspace, for either kind of arrow the layout is such that the target is - // the prefix, so we pick that keyspace to more efficiently list all arrows that target the key. - cx.open(A::SPEC.by_target) - .scan(target) - .map_ok(|(mut k, _)| { - // Arrows from `by_target` are oriented target-first, while the decoder function requires - // that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix - // or the whole slice, swapping the two keys always gives us the ordering expected by the - // decoding function. - let (t, o) = k[..32].split_at_mut(16); - t.swap_with_slice(o); - A::Kind::dec(&k) - }) - } - - /// List outgoing arrows relative to `origin`. - pub fn outgoing<'db, A>( - cx: &'db impl Query, - origin: Key, - ) -> impl Iterator> + 'db - where - A: Arrow, - A::Kind: 'db, - { - cx.open(A::SPEC.by_origin) - .scan(origin) - .map_ok(|(ref k, _)| A::Kind::dec(k)) - } - - /// Create a new arrow. - pub fn create(cx: &impl Write, arrow: A::Kind) -> Result<()> - where - A: Arrow, - { - let (by_origin, by_target) = arrow.enc(); - cx.open(A::SPEC.by_origin).set(by_origin, b"")?; - cx.open(A::SPEC.by_target).set(by_target, b"")?; - OK - } - - /// Delete all arrows from `origin` to `target`. - /// - /// TODO: Remove the query requirement (depends on range delete being available). - pub fn delete_all(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()> - where - A: Arrow, - { - let by_origin = cx.open(A::SPEC.by_origin); - let by_target = cx.open(A::SPEC.by_target); - Ok(if A::Kind::IS_MULTI { - // TODO: optimize this implementation using range deletes. - for key in by_origin.scan(origin.fuse(target)).keys() { - let key = Multi::decode(key?.as_ref()); - by_origin.del(key.encode())?; - by_target.del(key.swap().encode())?; - } - } else { - by_origin.del(origin.fuse(target))?; - by_target.del(target.fuse(origin))?; - }) - } - /// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist. - pub fn delete_one(cx: &impl Write, arrow: A::Kind) -> Result<()> - where - A: Arrow, - { - let (by_origin, by_target) = arrow.enc(); - cx.open(A::SPEC.by_origin).del(by_origin)?; - cx.open(A::SPEC.by_target).del(by_target)?; - OK - } - } - - /// Types representing the different kinds of arrows. - mod kinds { - use super::ArrowKind; - use crate::Key; - - impl ArrowKind for Multi { - const IS_MULTI: bool = true; - fn dec(buf: &[u8]) -> Self { - Multi::decode(buf) - } - fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { - (self.encode(), self.swap().encode()) - } - } - - impl ArrowKind for Basic { - const IS_MULTI: bool = false; - fn dec(buf: &[u8]) -> Self { - Basic::decode(buf) - } - fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { - (self.encode(), self.reverse().encode()) - } - } - - /// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist - /// between two vertices. - #[derive(Clone, Copy)] - pub struct Multi { - /// The node that this edge points away from. - pub origin: Key, - /// The node that this edge points towards. - pub target: Key, - /// The discriminator of this particular edge, which distinguishes it from all other edges that - /// connect `origin` and `target`, and indeed from every other edge or node in the graph. - pub identity: Key, - } - - impl Multi { - /// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly - /// oriented, the results will be wrong; the arrow will be oriented *away* from the target and - /// *at* the origin, instead of the other way around. - /// - /// # Orientation - /// - /// In this context, *correctly oriented* means that it is laid out in *origin-first* order, - /// like this (where `o`, `t` and `i` represent bytes): - /// - /// ```text - /// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii - /// |--------------| |--------------| |--------------| - /// origin target identity - /// ..16 16..32 32.. - /// ``` - /// - /// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is - /// the prefix, followed by the origin, and then the identity. This is also called *target-first* - /// encoding in this documentation. - /// - /// # Silent failure - /// - /// There is no way to detect whether the ordering is correct from just the buffer, so the caller - /// must ensure that the order is correct. If you have a target-first encoded buffer, you can have - /// to swap the two keys before passing it into this function, or this function will give you an - /// edge that does not exist (since a multiedge can only point in one direction). - /// - /// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`. - /// - /// # Panics - /// - /// This function panics if `buf` is not exactly 48 bytes long. - pub fn decode(buf: &[u8]) -> Multi { - Multi { - origin: Key::from_slice(&buf[..16]), - target: Key::from_slice(&buf[16..32]), - identity: Key::from_slice(&buf[32..]), - } - } - /// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation - /// of the difference between origin-first encoding and target-first encoding. - pub fn encode(self) -> [u8; 48] { - let mut key = [0; 48]; - key[..16].copy_from_slice(&self.origin.0); - key[16..32].copy_from_slice(&self.target.0); - key[32..].copy_from_slice(&self.identity.0); - key - } - /// Swap the origin and target of this arrow, while leaving the identity the same. - pub(super) fn swap(self) -> Multi { - Multi { - origin: self.target, - target: self.origin, - ..self - } - } - } - - /// A normal directed edge. Duplicates are not allowed. - /// - /// This kind of arrow is useful for modeling predicates and simple relationships. - #[derive(Clone, Copy)] - pub struct Basic { - pub origin: Key, - pub target: Key, - } - - impl Basic { - /// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the - /// other direction). - pub fn reverse(self) -> Basic { - Basic { - origin: self.target, - target: self.origin, - } - } - /// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering. - pub fn encode(self) -> [u8; 32] { - self.origin.fuse(self.target) - } - /// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information - /// about key encoding. - /// - /// # Panics - /// - /// Panics if `buf` is not exactly 32 bytes long. - pub fn decode(buf: &[u8]) -> Basic { - let (origin, target) = Key::split(buf); - Basic { origin, target } - } - } - } - } - - mod mixin { - use bincode::{Decode, Encode}; - - use super::types::{MixinSpec, RecordType}; - - /// Mixins are the simplest pieces of data in the store. - pub trait Mixin: RecordType + Encode + Decode {} - } - - mod alias { - use super::types::{AliasSpec, RecordType}; - - /// An alternative unique identifier for a node. - pub trait Alias: RecordType {} - } - - /// An internal interface to a specific keyspace that exposes basic hashmap-esque operations - /// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a - /// [`Store`]. - struct Keyspace<'db, C> { - context: &'db C, - cf: Arc>, - } - - impl<'db, C> Keyspace<'db, C> - where - C: Query, - { - /// Fetch a value from the keyspace. - fn get(&self, key: impl AsRef<[u8]>) -> Result + 'db>> { - self.context.get_pinned(&self.cf, key) - } - /// Test whether a key exists. - fn has(&self, key: impl AsRef<[u8]>) -> Result { - self.get(key).map(|r| r.is_some()) - } - /// Execute a prefix scan. - fn scan( - &self, - prefix: impl AsRef<[u8]> + 'db, - ) -> impl Iterator, Box<[u8]>)>> + 'db { - let t = prefix.as_ref().to_vec(); - self.context - .prefix_iterator(&self.cf, prefix.as_ref()) - // The prefix iterator may "overshoot". This makes it stop when it reaches - // the end of the range that has the prefix. - .take_while(move |r| match r { - Ok((ref k, _)) => k.starts_with(&t), - _ => true, - }) - .map_err(Error::Internal) - } - /// List all pairs in the keyspace. - fn list(&self) -> impl Iterator, Box<[u8]>)>> + 'db { - self.context - .full_iterator(&self.cf, IteratorMode::Start) - .map_err(Error::Internal) - } - } - - impl Keyspace<'_, C> - where - C: Write, - { - fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { - self.context.put(&self.cf, key, val) - } - fn del(&self, key: impl AsRef<[u8]>) -> Result<()> { - self.context.delete(&self.cf, key) - } - } - - mod cx { - //! Contexts for doing reads, writes or both to the database. - //! - //! The traits in this module map abstract calls to their methods on the [rocksdb] objects. - - use rocksdb::{ - AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode, - }; - - use super::{Batch, Keyspace, Store, Transaction}; - use crate::{Backend, Error, Result, OK}; - - /// A context for executing database operations. - pub trait Context { - /// Open the keyspace identified by `cf`. - fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> - where - Self: Sized; - } - - /// A context in which one can read from the data store. - /// - /// Specifically, this maps calls to either the transaction or the store's internals without us having - /// to implement methods for *both* transactions and the store. - pub trait Query: Context { - type Backend: DBAccess; - fn get_pinned<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - key: impl AsRef<[u8]>, - ) -> Result>>; - fn prefix_iterator<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - prefix: &[u8], - ) -> DBIteratorWithThreadMode<'a, Self::Backend>; - fn full_iterator<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - mode: IteratorMode<'a>, - ) -> DBIteratorWithThreadMode<'a, Self::Backend>; - } - - /// A context in which one can read from and modify the data store. - pub trait Write: Context { - fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>; - fn put( - &self, - cf: &impl AsColumnFamilyRef, - key: impl AsRef<[u8]>, - val: impl AsRef<[u8]>, - ) -> Result<()>; - } - - impl Context for Store { - fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { - Keyspace { - cf: self.inner.cf_handle(cf.as_ref()).unwrap(), - context: &self, - } - } - } - - impl Query for Store { - type Backend = Backend; - - fn get_pinned<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - key: impl AsRef<[u8]>, - ) -> Result>> { - self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) - } - - fn prefix_iterator<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - prefix: &[u8], - ) -> DBIteratorWithThreadMode<'a, Backend> { - self.inner.prefix_iterator_cf(cf, prefix) - } - - fn full_iterator<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - mode: IteratorMode<'a>, - ) -> DBIteratorWithThreadMode<'a, Self::Backend> { - self.inner.full_iterator_cf(cf, mode) - } - } - - impl Context for Transaction<'_> { - fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { - Keyspace { - cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), - context: &self, - } - } - } - - impl<'db> Query for Transaction<'db> { - type Backend = rocksdb::Transaction<'db, Backend>; - - fn get_pinned<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - key: impl AsRef<[u8]>, - ) -> Result>> { - self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) - } - - fn prefix_iterator<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - prefix: &[u8], - ) -> DBIteratorWithThreadMode<'a, Self::Backend> { - self.inner.prefix_iterator_cf(cf, prefix) - } - - fn full_iterator<'a>( - &'a self, - cf: &impl AsColumnFamilyRef, - mode: IteratorMode<'a>, - ) -> DBIteratorWithThreadMode<'a, Self::Backend> { - self.inner.full_iterator_cf(cf, mode) - } - } - - impl Write for Transaction<'_> { - fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { - self.inner.delete_cf(cf, key).map_err(Error::Internal) - } - - fn put( - &self, - cf: &impl AsColumnFamilyRef, - key: impl AsRef<[u8]>, - val: impl AsRef<[u8]>, - ) -> Result<()> { - self.inner.put_cf(cf, key, val).map_err(Error::Internal) - } - } - - impl Context for Batch { - fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> - where - Self: Sized, - { - Keyspace { - cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), - context: &self, - } - } - } - - impl Write for Batch { - fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { - self.inner.borrow_mut().delete_cf(cf, key); - OK - } - - fn put( - &self, - cf: &impl AsColumnFamilyRef, - key: impl AsRef<[u8]>, - val: impl AsRef<[u8]>, - ) -> Result<()> { - self.inner.borrow_mut().put_cf(cf, key, val); - OK - } - } - } -} +pub type Backend = rocksdb::TransactionDB; diff --git a/lib/store/src/mixin.rs b/lib/store/src/mixin.rs index 237808f..2f746c8 100644 --- a/lib/store/src/mixin.rs +++ b/lib/store/src/mixin.rs @@ -1,35 +1,147 @@ -//! Modules of information. - use bincode::{Decode, Encode}; +/// Derive a [`Mixin`] implementation. +pub use r#macro::Mixin; -use crate::Space; +use super::{ + types::{MixinSpec, Value}, + Batch, Store, Transaction, +}; +use crate::{Error, Key, Result}; -/// A simple piece of data associated with a vertex. -pub trait Mixin: Encode + Decode { - const SPACE: Space; +/// Mixins are the simplest pieces of data in the store. +pub trait Mixin: Value + Encode + Decode {} + +impl Store { + /// Get the value! + pub fn get_mixin(&self, node: Key) -> Result> + where + M: Mixin, + { + op::get_mixin(self, node) + } + /// Check if `node` has a mixin `M`. + pub fn has_mixin(&self, node: Key) -> Result + where + M: Mixin, + { + op::has_mixin::(self, node) + } } -/// Information needed to render a social media profile. -#[derive(Encode, Decode)] -pub struct Profile { - pub post_count: usize, - pub account_name: String, - pub display_name: Option, - pub about_string: Option, - pub about_fields: Vec<(String, String)>, +impl Transaction<'_> { + /// Apply an update function to the mixin `M` of `node`. + /// + /// # Errors + /// + /// - [`Error::Missing`]: if `node` does not have a mixin of this type. + /// + /// [`Error::Missing`]: crate::Error::Missing + pub fn update(&self, node: Key, update: impl FnOnce(M) -> M) -> Result<()> + where + M: Mixin, + { + op::update(self, node, update) + } + /// Get the mixin of the specified type associated with `node`. + pub fn get_mixin(&self, node: Key) -> Result> + where + M: Mixin, + { + op::get_mixin(self, node) + } + /// Add a mixin to `node`. + /// + /// # Errors + /// + /// - [`Error::Conflict`]: if `node` already has a mixin of type `M`. + /// + /// [`Error::Conflict`]: crate::Error::Missing + pub fn add_mixin(&self, node: Key, mixin: M) -> Result<()> + where + M: Mixin, + { + if op::has_mixin::(self, node)? { + return Err(Error::Conflict); + } else { + op::add_mixin::(self, node, mixin) + } + } + /// Check whether `node` has an `M` defined for it. + pub fn has_mixin(&self, node: Key) -> Result + where + M: Mixin, + { + op::has_mixin::(self, node) + } } -impl Mixin for Profile { - const SPACE: Space = Space("profile"); +impl Batch { + /// Add a mixin to the `node`. + /// + /// **Note**: unlike [`Transaction::add_mixin`], this will *not* return an error if the key already has a mixin + /// of this type. This *should* not cause inconsistency. + pub fn put_mixin(&mut self, node: Key, mixin: M) + where + M: Mixin, + { + op::add_mixin(self, node, mixin).unwrap() + } } -/// Contents of a post. -#[derive(Encode, Decode)] -pub struct Content { - pub content: Option, - pub summary: Option, -} +mod op { + use super::Mixin; + use crate::{internal::*, Error, Key, Result}; -impl Mixin for Content { - const SPACE: Space = Space("content"); + pub fn update( + cx: &(impl Query + Write), + node: Key, + update: impl FnOnce(M) -> M, + ) -> Result<()> + where + M: Mixin, + { + // TODO: implement in terms of a merge operator instead of separate query and write ops. + // this would let us remove the `Query` bound, which would in turn let us update from within + // a batch. + // + // See https://github.com/facebook/rocksdb/wiki/Merge-Operator + // + // It looks like rocksdb allows you to specify a merge operator per column family.[^1] + // This means we can construct our column families with a merge operator that knows how to encode and decode mixins. + // + // [^1]: https://github.com/facebook/rocksdb/blob/9d37408f9af15c7a1ae42f9b94d06b27d98a011a/include/rocksdb/options.h#L128 + let tree = cx.open(M::SPEC.keyspace); + match tree.get(node.as_ref())? { + None => Err(Error::Missing), + Some(buf) => { + let new = decode(buf).map(update).and_then(encode)?; + tree.set(node, new) + } + } + } + + pub fn get_mixin(cx: &impl Query, node: Key) -> Result> { + cx.open(M::SPEC.keyspace).get(node)?.map(decode).transpose() + } + + pub fn add_mixin(cx: &impl Write, node: Key, mixin: M) -> Result<()> { + cx.open(M::SPEC.keyspace).set(node, encode(mixin)?) + } + + pub fn has_mixin(cx: &impl Query, node: Key) -> Result { + cx.open(M::SPEC.keyspace).has(node) + } + + pub(super) fn encode(data: impl bincode::Encode) -> Result> { + bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) + } + + fn decode(data: impl AsRef<[u8]>) -> Result + where + T: bincode::Decode, + { + bincode::decode_from_slice(data.as_ref(), bincode::config::standard()) + .map_err(Error::Decoding) + .map(|(v, _)| v) + } } diff --git a/lib/store/src/transaction.rs b/lib/store/src/transaction.rs deleted file mode 100644 index e7d1ca5..0000000 --- a/lib/store/src/transaction.rs +++ /dev/null @@ -1,437 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use bincode::{Decode, Encode}; -use rocksdb::{BoundColumnFamily, IteratorMode}; - -use crate::{ - arrow::Direction, key::Tag, util::IterExt as _, Alias, Arrow, Backend, Error, Key, Keylike, - Mixin, Result, Store, OK, SPACES, -}; - -impl Store { - /// Initiate a transaction. - /// - /// If the result is an error, the transaction is rolled back, and otherwise the transaction - /// is committed. - pub fn transaction(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result { - // Load all the column family handles, because they can't be accessed through the - // `rocksdb::Transaction` struct, only the `TransactionDB`. - let spaces = SPACES - .into_iter() - .map(|name| (*name, self.inner.cf_handle(name).unwrap())) - .collect(); - let tx = Transaction { - inner: self.inner.transaction(), - spaces, - }; - let result = f(&tx); - if result.is_err() { - tx.inner.rollback()?; - } else { - tx.inner.commit()?; - } - result - } - /// Check whether a key exists in the registry, - pub fn is_registered(&self, key: Key) -> Result { - let cf = self - .inner - .cf_handle("registry") - .expect("failed to open registry"); - self.inner - .get_pinned_cf(&cf, key) - .map(|opt| opt.is_some()) - .map_err(Error::Internal) - } -} - -/// A database transaction, in which either each action succeeds, or everything fails -/// together. -/// -/// The transaction struct is the interface for quering and manipulating persisted content. -pub struct Transaction<'db> { - inner: rocksdb::Transaction<'db, Backend>, - spaces: HashMap<&'static str, Arc>>, -} - -/// Methods for manipulating the registry. -/// -/// Before you can manipulate a vertex, its needs to be registered. -impl Transaction<'_> { - /// Register a new vertex. - pub fn create_vertex(&self, key: Key, tag: Tag) -> Result<()> { - self.with("registry").set(key, [tag.0]) - } - /// Delete a vertex from the registry. - pub fn delete_vertex(&self, key: Key) -> Result<()> { - // TODO: also make this delete all related data? - self.with("registry").del(key) - } - /// Check whether a vertex is registered in the database. - pub fn is_registered(&self, key: Key) -> Result { - self.with("registry").has(key) - } -} - -/// Methods for manipulating mixins. -/// -/// For each implementor of [`Mixin`], a vertex can have at most one record of that type -/// associated with it. -impl Transaction<'_> { - /// Query the store for a value associated with the vertex `key` identifies. - /// - /// Using a [`Key`] is more efficient than using an alias. - pub fn lookup(&self, key: impl Keylike) -> Result<(Key, M)> - where - M: Mixin, - { - // Checked translate isn't needed, we'll complain if we can't find the data. - let canonicalized_key = key.translate(&self)?; - let raw = self.with(M::SPACE).get(canonicalized_key)?; - let value = decode(raw.as_ref())?; - Ok((canonicalized_key, value)) - } - /// Associate a new mixin value with the key. - /// - /// # Errors - /// - /// - `Error::Conflict` if a mixin of this type is already associated with the vertex - /// - `Error::Undefined` if `key` is not in the registry. - pub fn insert(&self, key: impl Keylike, data: M) -> Result<()> - where - M: Mixin, - { - let key = key.checked_translate(&self)?; - let data = encode(data)?; - let ns = self.with(M::SPACE); - // Check for conflicts. Fail if the key already exists, otherwise set the key - // to the given value. - if ns.has(key)? { - Err(Error::Conflict) - } else { - ns.set(key, data) - } - } - /// Apply an update function to the mixin identified by the key. - /// - /// # Errors - /// - /// - `Error::Undefined` if the `key` is not registered - /// - `Error::Missing` if `key` does not exist in the keyspace associated with `M` - pub fn update(&self, key: impl Keylike, f: impl FnOnce(Key, M) -> Result) -> Result<()> - where - M: Mixin, - { - let key = key.checked_translate(self)?; - let (key, old) = self.lookup::(key)?; - let new = f(key, old).and_then(encode)?; - self.with(M::SPACE).set(key, new) - } - /// Remove the mixin from the vertex `key` refers to. - /// - /// Doesn't complain if the value does not exist in the expected keyspace. - pub fn remove(&self, key: impl Keylike) -> Result> - where - M: Mixin, - { - // Checked translate isn't needed because we don't care if the key is bogus. - let canonical_key = key.translate(self)?; - let ns = self.with(M::SPACE); - match ns.pop(canonical_key) { - Ok(Some(val)) => decode(&val).map(Some), - Ok(None) => Ok(None), - Err(err) => Err(err), - } - } - /// List all key-value pairs for mixins of type `M`. - pub fn list(&self) -> impl Iterator> + '_ - where - M: Mixin, - { - self.with(M::SPACE).list().bind_results(|(k, v)| { - let v = decode(v.as_ref())?; - let k = Key::from_slice(k.as_ref()); - Ok((k, v)) - }) - } -} - -/// Methods for interacting with [aliases][Alias], which are unique alternate keys. -impl Transaction<'_> { - /// Look up the key that the given alias maps to. - /// - /// If the key was deleted, but the alias wasn't properly cleaned up, - pub fn lookup_alias(&self, alias: A) -> Result - where - A: Alias, - { - let (l, _) = A::SPACE; - let raw = self.with(l).get(alias.to_string())?; - Ok(Key::from_slice(raw.as_ref())) - } - /// Create a new alias of type `A` for the given [`Key`]. - /// - /// If the alias already exists, this function returns `Conflict`. - pub fn insert_alias(&self, key: Key, alias: A) -> Result<()> - where - A: Alias, - { - let (l, r) = A::SPACE; - let alias = alias.to_string(); - if self.with(l).has(&alias)? { - return Err(Error::Conflict); - } - self.with(l).set(&alias, key)?; - self.with(r).set(key, &alias)?; - OK - } - /// Delete the alias of type `A` that points to `key`. - pub fn remove_alias(&self, key: Key) -> Result<()> - where - A: Alias, - { - let (l, r) = A::SPACE; - // First, pop the reverse mapping, which will give us the encoded - // key for the normal mapping. If it doesn't exist, don't delete - // the normal mapping. - if let Some(alias) = self.with(r).pop(key)? { - self.with(l).pop(alias)?; - } - OK - } -} - -impl Transaction<'_> { - /// Find an arrow of type `A` with the given `tail` and `head`. - pub fn lookup_arrow(&self, (tail, head): (Key, Key)) -> Result> - where - A: Arrow, - { - let (l, _) = A::SPACE; - match self.with(l).get(tail.fuse(head)) { - Ok(raw) => decode(raw.as_ref()).map(Some), - Err(Error::Missing) => Ok(None), - Err(err) => Err(err), - } - } - /// Create a simple arrow of type `A`. - /// - /// # Errors - /// - /// - `Error::Undefined` if either key is not registered - pub fn insert_arrow(&self, (tail, head): (Key, Key)) -> Result<()> - where - A: Arrow(&self, (tail, head): (Key, Key)) -> Result<()> - where - A: Arrow, - { - self.with(A::SPACE.0).del(tail.fuse(head))?; - self.with(A::SPACE.1).del(head.fuse(tail))?; - OK - } - /// Check whether an arrow exists. - pub fn exists(&self, (tail, head): (Key, Key)) -> Result - where - A: Arrow, - { - self.with(A::SPACE.0).has(tail.fuse(head)) - } - /// Get all arrows of type `A` "pointing at" `key`. - pub fn list_incoming( - &self, - key: impl Keylike, - ) -> impl Iterator> + '_ - where - A: Arrow, - { - self.list_arrows_where::(Direction::Incoming, key) - } - /// Get all arrows of type `A` "pointing away from" `key`. - pub fn list_outgoing( - &self, - key: impl Keylike, - ) -> impl Iterator> + '_ - where - A: Arrow, - { - self.list_arrows_where::(Direction::Outgoing, key) - } - /// Get all arrows of type `A`. - pub fn list_arrows(&self) -> impl Iterator> + '_ - where - A: Arrow, - { - self.with(A::SPACE.0).list().bind_results(|(k, v)| { - let (tail, head) = Key::split(k.as_ref()); - decode(v.as_ref()).map(|label| (tail, label, head)) - }) - } - /// Select arrows with the given direction relative to the given key. - fn list_arrows_where( - &self, - direction: Direction, - key: impl Keylike, - ) -> impl Iterator> + '_ - where - A: Arrow, - { - // Keys in space 0 are arranged with the tail at the start, and the ones in space 1 - // are arranged with the head at the start. This allows us to efficiently prefix scan - // regardless of the direction, at the cost of increased space usage. - let space = match direction { - Direction::Outgoing => A::SPACE.0, - Direction::Incoming => A::SPACE.1, - }; - let key = key.translate(&self).unwrap(); - #[cfg(test)] - eprintln!("scanning {} using prefix {key}", space.0); - self.with(space).scan(key).bind_results(|(k, v)| { - // Because we're prefix scanning on the first half of the key, we only want to - // get the second here. - let (_this, other) = Key::split(k.as_ref()); - #[cfg(test)] - eprintln!(" found {_this}:{other}"); - decode(v.as_ref()).map(|label| (other, label)) - }) - } - pub(crate) fn quiver(&self, tag: Tag) -> Quiver<'_> { - Quiver { tag, tx: &self } - } -} - -impl Transaction<'_> { - /// Use a keyspace. - fn with(&self, name: impl AsRef) -> Keyspace<'_> { - Keyspace { - cf: self.spaces[name.as_ref()].clone(), - tx: &self, - } - } -} - -/// Provides the basic API for a keyspace/column family. -struct Keyspace<'db> { - tx: &'db Transaction<'db>, - cf: Arc>, -} - -impl<'db> Keyspace<'db> { - /// Retrieve a value from the database. Returns `Missing` if the key does not exist. - fn get(&self, key: impl AsRef<[u8]>) -> Result + 'db> { - self.tx - .inner - .get_pinned_cf(&self.cf, key) - .map_err(Error::Internal) - .and_then(|opt| opt.ok_or(Error::Missing)) - } - /// Set the value at `key` to `val`. - fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { - self.tx - .inner - .put_cf(&self.cf, key, val) - .map_err(Error::Internal) - } - /// Delete the key-value pair identified by `key`. - fn del(&self, key: impl AsRef<[u8]>) -> Result<()> { - self.tx.inner.delete_cf(&self.cf, &key)?; - OK - } - /// Remove the key and associated value from the keyspace, and return its previous value. - fn pop(&self, key: impl AsRef<[u8]>) -> Result>> { - let old = self.tx.inner.get_for_update_cf(&self.cf, &key, true)?; - self.del(key)?; - Ok(old) - } - /// Check whether the key exists in the keyspace. - fn has(&self, key: impl AsRef<[u8]>) -> Result { - self.tx - .inner - .get_pinned_cf(&self.cf, key) - .map_err(Error::Internal) - .map(|opt| opt.is_some()) - } - /// Execute a prefix scan. - fn scan( - &self, - prefix: impl AsRef<[u8]> + 'db, - ) -> impl Iterator + 'static, impl AsRef<[u8]> + 'static)>> + 'db - { - let t = prefix.as_ref().to_vec(); - self.tx - .inner - .prefix_iterator_cf(&self.cf, prefix.as_ref()) - // The prefix iterator may "overshoot". This makes it stop when it reaches - // the end of the range that has the prefix. - .take_while(move |r| match r { - Ok((ref k, _)) => k.starts_with(&t), - _ => true, - }) - .map_err(Error::Internal) - } - /// Show all items in the entire keyspace. - fn list( - &self, - ) -> impl Iterator + 'static, impl AsRef<[u8]> + 'static)>> + 'db - { - self.tx - .inner - .full_iterator_cf(&self.cf, IteratorMode::Start) - .map_err(Error::Internal) - } -} - -/// The quiver allows one to manipulate all parallel edges tagged with a particular type. -pub struct Quiver<'db> { - tx: &'db Transaction<'db>, - tag: Tag, -} - -impl<'db> Quiver<'db> { - pub fn insert(&self, origin: Key, target: Key, identity: Key) -> Result<()> { - let fused = origin.fuse(target); - self.tx.with("multi:id-map").set(identity, fused)?; - let mut triple = [0; 48]; - triple[..32].copy_from_slice(&fused); - triple[32..].copy_from_slice(identity.as_ref()); - self.tx.with("multi:index/l").set(triple, b"")?; - triple[..32].rotate_left(16); - self.tx.with("multi:index/r").set(triple, b"")?; - OK - } - pub fn list_incoming(&self, target: Key) -> impl Iterator> + 'db { - self.tx - .with("multi:index/r") - .scan(target) - .map_ok(|(k, _)| Key::split(&k.as_ref()[16..])) - } -} - -fn encode(data: impl Encode) -> Result> { - bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) -} - -fn decode(data: &[u8]) -> Result -where - T: Decode, -{ - bincode::decode_from_slice(data, bincode::config::standard()) - .map_err(Error::Decoding) - .map(|(v, _)| v) -} - -#[cfg(test)] -mod tests; diff --git a/lib/store/src/transaction/tests.rs b/lib/store/src/transaction/tests.rs deleted file mode 100644 index 9259ef7..0000000 --- a/lib/store/src/transaction/tests.rs +++ /dev/null @@ -1,256 +0,0 @@ -use super::*; -use crate::Space; - -#[derive(Encode, Decode)] -struct TestArrow; - -impl Arrow for TestArrow { - const SPACE: (Space, Space) = (Space("test-arrow/l"), Space("test-arrow/r")); -} - -const TEST_TAG: Tag = Tag(69); - -macro_rules! keygen { - { $($name:ident)* } => { - $( - let $name = Key::gen(); - eprintln!(concat!(stringify!($name), "={}"), $name); - )* - } -} - -fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) -> Result<()> { - Store::with_tmp(|db| { - // Run these tests 128 times because misuse of prefix iterator may cause weird, - // obscure bugs :3 - // - // Also, because we don't wipe the store between test runs, we have more chances - // to discover weird bugs that we wouldn't catch if there was only a single run. - Ok(for n in 0..128 { - eprintln!("--- run {n} ---"); - db.transaction(|tx| { - keygen!(target origin); - - tx.create_vertex(target, TEST_TAG)?; - tx.create_vertex(origin, TEST_TAG)?; - - tx.insert_arrow::((origin, target))?; - - let l: Vec = tx - .with("test-arrow/l") - .list() - .map_ok(|(k, _)| Key::split(k.as_ref())) - .map_ok(|(a, b)| format!("({a}, {b})")) - .try_collect()?; - - eprintln!("test-arrow/l = {l:#?}"); - - let r: Vec = tx - .with("test-arrow/r") - .list() - .map_ok(|(k, _)| Key::split(k.as_ref())) - .map_ok(|(a, b)| format!("({a}, {b})")) - .try_collect()?; - - eprintln!("test-arrow/r = {r:#?}"); - - f(origin, target, &tx, n) - })?; - eprintln!("--- end run {n} ---"); - }) - }) -} - -#[test] -fn target_incoming() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let ti: Vec<_> = tx.list_incoming::(target).keys().try_collect()?; - - eprintln!("target.incoming = {ti:#?}"); - - assert!(ti.contains(&origin), "origin ∈ target.incoming"); - assert!(!ti.contains(&target), "target ∉ target.incoming"); - - OK - }) -} - -#[test] -fn target_outgoing() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let to: Vec<_> = tx.list_outgoing::(target).keys().try_collect()?; - - eprintln!("target.outgoing = {to:#?}"); - - assert!(!to.contains(&target), "target ∉ target.outgoing"); - assert!(!to.contains(&origin), "origin ∉ target.outgoing"); - - OK - }) -} - -#[test] -fn origin_incoming() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let oi: Vec<_> = tx.list_incoming::(origin).keys().try_collect()?; - - eprintln!("origin.incoming = {oi:#?}"); - - assert!(!oi.contains(&origin), "origin ∉ origin.incoming"); - assert!(!oi.contains(&target), "target ∉ origin.incoming"); - - OK - }) -} - -#[test] -fn origin_outgoing() -> Result<()> { - with_test_arrow(|origin, target, tx, _| { - let oo: Vec<_> = tx.list_outgoing::(origin).keys().try_collect()?; - - eprintln!("origin.outgoing = {oo:#?}"); - - assert!(oo.contains(&target), "target ∈ origin.outgoing"); - assert!(!oo.contains(&origin), "origin ∉ origin.outgoing"); - - OK - }) -} - -#[test] -fn fanout() -> Result<()> { - let targets: [Key; 128] = std::array::from_fn(|_| Key::gen()); - let origin = Key::gen(); - Store::with_tmp(|db| { - db.transaction(|tx| { - tx.create_vertex(origin, TEST_TAG)?; - for t in targets { - tx.create_vertex(t, TEST_TAG)?; - tx.insert_arrow::((origin, t))?; - } - - let oo: Vec<_> = tx.list_outgoing::(origin).keys().try_collect()?; - - for t in targets { - assert!(oo.contains(&t), "∀ t ∈ targets: t ∈ origin.outgoing"); - let ti: Vec<_> = tx.list_incoming::(t).keys().try_collect()?; - assert!( - ti == vec! {origin}, - "∀ t ∈ targets: t.incoming = {{origin}}" - ); - } - - OK - }) - }) -} - -#[test] -fn fanin() -> Result<()> { - let origins: [Key; 128] = std::array::from_fn(|_| Key::gen()); - let target = Key::gen(); - Store::with_tmp(|db| { - db.transaction(|tx| { - tx.create_vertex(target, TEST_TAG)?; - for o in origins { - tx.create_vertex(o, TEST_TAG)?; - tx.insert_arrow::((o, target))?; - } - - let ti: Vec<_> = tx.list_incoming::(target).keys().try_collect()?; - - for o in origins { - let oo: Vec<_> = tx.list_outgoing::(o).keys().try_collect()?; - assert!(ti.contains(&o), "∀ o ∈ origins: o ∈ target.incoming"); - assert!( - oo == vec! {target}, - "∀ o ∈ origins: o.outgoing = {{target}}" - ); - } - - OK - }) - }) -} - -#[test] -fn distinct_many_to_many() -> Result<()> { - let origins: [Key; 32] = std::array::from_fn(|_| Key::gen()); - let targets: [Key; 32] = std::array::from_fn(|_| Key::gen()); - Store::with_tmp(|db| { - db.transaction(|tx| { - for t in targets { - tx.create_vertex(t, TEST_TAG)?; - } - for o in origins { - tx.create_vertex(o, TEST_TAG)?; - for t in targets { - tx.insert_arrow::((o, t))?; - } - } - - let ti: HashMap> = targets - .into_iter() - .map(|t| { - tx.list_incoming::(t) - .keys() - .try_collect() - .map(|v: Vec<_>| (t, v)) - }) - .collect::>()?; - - // For each origin point, there must be a target that has it as "incoming". - assert!( - origins - .into_iter() - .all(|o| { targets.into_iter().any(|t| { ti[&t].contains(&o) }) }), - "∀ o ∈ origins: ∃ t ∈ targets: o ∈ t.incoming" - ); - - // Each target has each origin as incoming. - assert!( - origins - .into_iter() - .all(|o| { targets.into_iter().all(|t| { ti[&t].contains(&o) }) }), - "∀ o ∈ origins: ∀ t ∈ targets: o ∈ t.incoming" - ); - - let to: HashMap> = targets - .into_iter() - .map(|t| { - tx.list_outgoing::(t) - .keys() - .try_collect() - .map(|v: Vec<_>| (t, v)) - }) - .collect::>()?; - - // Our arrows point only from origins to targets, and there's a bug if there - // exists a target such that its outgoing set is non-empty. - assert!( - !targets.into_iter().any(|t| !to[&t].is_empty()), - "∄ t ∈ targets: t.outgoing ≠ ∅" - ); - - let oo: HashMap> = origins - .into_iter() - .map(|o| { - tx.list_outgoing::(o) - .keys() - .try_collect() - .map(|v: Vec<_>| (o, v)) - }) - .collect::>()?; - - // Each origin has each target as outgoing. - assert!( - origins - .into_iter() - .all(|o| targets.into_iter().all(|t| oo[&o].contains(&t))), - "∀ o ∈ origins: ∀ t ∈ targets: t ∈ o.outgoing" - ); - - OK - }) - }) -} diff --git a/lib/store/src/types.rs b/lib/store/src/types.rs new file mode 100644 index 0000000..bcde887 --- /dev/null +++ b/lib/store/src/types.rs @@ -0,0 +1,171 @@ +//! Defining a [`Schema`]. +//! +//! There is a lot of complicated machinery here to make it so that you have to write very little code to +//! define new types. Basically, if you want to define a thing to store, you need to implement the trait +//! for it (e.g. [`Arrow`]), and also implement [`Value`], where you create a specification describing which +//! namespaces store records of that type. +//! +//! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able +//! to operate on the types. +//! +//! [`Arrow`]: super::Arrow + +use std::collections::HashSet; + +use derive_more::Display; + +/// The namespace where all vertices must be registered. +pub(crate) const NODE_HEADERS: Namespace = Namespace("header:node"); + +/// The namespace where multiedge identities are mapped to endpoints. +pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); + +/// A specification of all user-defined namespaces. +/// +/// # Example +/// +/// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then +/// testing whether it exists. +/// +/// ```rust +/// use store::{ arrow::Arrow, types::Schema, Store, Key, OK }; +/// +/// // Each kind of value has a derive macro. +/// #[derive(Arrow)] +/// struct MyArrow { origin: Key, target: Key } +/// +/// fn main () -> store::Result<()> { +/// // Here, we make sure that the namespaces used for `MyArrow` are known. +/// let schema = Schema::new() +/// .has::(); +/// +/// let result = Store::tmp(schema, |db| { +/// let origin = Key::gen(); +/// let target = Key::gen(); +/// +/// let mut changes = db.batch(); +/// changes.create(MyArrow { origin, target }); +/// db.apply(changes)?; +/// +/// db.exists::() +/// })?; +/// +/// assert!(result); +/// OK +/// } +/// ``` +/// +/// [basic arrow]: crate::arrow::Basic +pub struct Schema(pub(crate) HashSet); + +impl Schema { + /// Construct a new empty schema. + pub fn new() -> Schema { + Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS])) + } + /// Add the component to the schema. + pub fn has(mut self) -> Schema + where + C: Value, + { + self.add(C::SPEC); + self + } + /// Add a spec to the schema by mutable reference. + pub fn add(&mut self, spec: impl TypeSpec) -> &mut Schema { + spec.register(&mut self.0); + self + } +} + +/// The name of a keyspace. +/// +/// Specifically, this is the name of a RocksDB column family. +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)] +pub struct Namespace(pub &'static str); + +impl AsRef for Namespace { + fn as_ref(&self) -> &str { + self.0 + } +} + +/// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a +/// [mixin](MixinSpec). +/// +/// All namespaces must be unique. +pub trait Value { + type Type: TypeSpec; + const SPEC: Self::Type; +} + +/// The specification for an [`Arrow`](super::Arrow). +/// +/// The listed namespaces must be unique among all other namespaces. +#[derive(Clone, Copy)] +pub struct ArrowSpec { + /// The keyspace where edge keys are ordered `(origin, target)`. + pub by_origin: Namespace, + /// The keyspace where edge keys are ordered `(target, origin)`. + pub by_target: Namespace, +} + +#[derive(Clone, Copy)] +pub struct AliasSpec { + pub keyspace: Namespace, + pub reversed: Namespace, +} + +#[derive(Clone, Copy)] +pub struct MixinSpec { + pub keyspace: Namespace, +} + +/// Describes how to add a [`Value`] to a [`Schema`]. +pub trait TypeSpec { + /// Register the namespaces. + fn register(&self, set: &mut HashSet); +} + +impl TypeSpec for ArrowSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.by_origin) { + panic! { + "Duplicate found while inserting Arrow::BY_ORIGIN: {}", + self.by_origin + } + } + if !set.insert(self.by_target) { + panic! { + "Duplicate found while inserting Arrow::BY_TARGET: {}", + self.by_target + } + } + } +} +impl TypeSpec for AliasSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.keyspace) { + panic! { + "Duplicate found while inserting Alias::KEYSPACE: {}", + self.keyspace + } + } + if !set.insert(self.reversed) { + panic! { + "Duplicate found while inserting Alias::REVERSED: {}", + self.reversed + } + } + } +} +impl TypeSpec for MixinSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.keyspace) { + panic! { + "Duplicate found while inserting Mixin::KEYSPACE: {}", + self.keyspace + } + } + } +}