diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index 3075cd8..75c3c5e 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -30,7 +30,7 @@ pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Re content: Some(content.to_string()), summary: None, })?; - tx.insert_arrow((author, key), AuthorOf)?; + tx.insert_arrow::((author, key))?; Ok(key) }) } @@ -131,7 +131,7 @@ pub mod fr { pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> { db.transaction(|tx| { - tx.insert_arrow((requester, target), FollowRequested)?; + tx.insert_arrow::((requester, target))?; OK }) } @@ -139,7 +139,7 @@ pub mod fr { pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> { db.transaction(|tx| { tx.remove_arrow::((requester, target))?; - tx.insert_arrow((requester, target), Follows)?; + tx.insert_arrow::((requester, target))?; OK }) } diff --git a/lib/store/src/arrow.rs b/lib/store/src/arrow.rs index bd1ed99..2dfff3d 100644 --- a/lib/store/src/arrow.rs +++ b/lib/store/src/arrow.rs @@ -44,7 +44,8 @@ pub mod multi { } /// A directed edge between two vertices. -pub trait Arrow: Encode + Decode { +pub trait Arrow { + type Label: Encode + Decode = (); const SPACE: (Space, Space); } @@ -55,7 +56,6 @@ pub enum Direction { } /// The node this arrow points away from is the "author" of the node the arrow points to. -#[derive(Encode, Decode)] pub struct AuthorOf; impl Arrow for AuthorOf { @@ -63,7 +63,6 @@ impl Arrow for AuthorOf { } /// The origin of this arrow has follow requested the target. -#[derive(Encode, Decode)] pub struct FollowRequested; impl Arrow for FollowRequested { @@ -71,7 +70,6 @@ impl Arrow for FollowRequested { } /// The origin "follows" the target. -#[derive(Encode, Decode)] pub struct Follows; impl Arrow for Follows { diff --git a/lib/store/src/lib.rs b/lib/store/src/lib.rs index 32c4a39..b68ee12 100644 --- a/lib/store/src/lib.rs +++ b/lib/store/src/lib.rs @@ -1,4 +1,15 @@ -#![feature(iterator_try_collect)] +#![feature( + iterator_try_collect, + associated_type_defaults, + // All needed to make the compile time string concatenation bullshit work + generic_const_exprs, + const_intrinsic_copy, + const_mut_refs, + generic_arg_infer, + str_from_raw_parts, + core_intrinsics, // "Using it is strongly discouraged" ok but it works tho + const_heap +)] //! The data store abstractions used by the ActivityPuppy project. //! //! Persistence in a puppy server is handled by this component, which implements a directed graph @@ -154,3 +165,1034 @@ pub enum Error { Encoding(bincode::error::EncodeError), Decoding(bincode::error::DecodeError), } + +pub mod new_interface { + //! Data persistence for the ActivityPuppy social media server. + //! + //! # Overview + //! + //! The design of the data store's abstractions is heavily inspired by graph theory. The idea is to encourage + //! composition and loose coupling by segmenting all data associated with a node into [mixins][Mixin], and + //! modeling relations and predicates between nodes as [arrows][Arrow]. In additions, the key identifying a + //! node can be [aliased][Alias] by a string newtype, which must be unique within the namespace of that alias. + //! + //! The read-only operations on the `Store` type will have abridged documentation compared to the docs on their + //! counterparts defined on the `Transaction` type. + + use std::{cell::RefCell, path::Path, sync::Arc}; + + use rocksdb::{ + BoundColumnFamily, IteratorMode, Options, TransactionDBOptions, WriteBatchWithTransaction, + }; + + use crate::{util::IterExt as _, Backend, Error, Result, STORE_NAME}; + + /// The main interface to the data persistence engine. + /// + /// This type provides reading capabilities, but does not expose APIs for manipulating data in the store. For + /// that, you must [run][Store::run] a [`Transaction`] or [apply][Store::apply] a [`Batch`]. + #[derive(Clone)] + pub struct Store { + // TODO: maybe switch to `OptimisticTransactionDB` because it has `batched_multi_get_cf`, which may be useful + // if we end up doing lots of point lookups. alternatively, maybe we don't need *transactions* altogether, and + // we can get away with write batches and snapshots. the main problem with transactions is that it doesn't let + // us do range deletes, which affects the efficiency of multiarrow deletion. + // + // a switch to write batches is feasible if we end up not doing reads and writes in the same transaction. + inner: Arc, + } + + impl Store { + /// Run a [transaction][Transaction]. + /// + /// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not + /// recorded. Changes made inside a transaction can be read from within that transaction before they are + /// committed. + /// + /// If the closure passed to `run` returns an error, the transaction is rolled back, and otherwise the + /// changes are committed. + pub fn run(&self, f: impl FnOnce(&Transaction<'_>) -> Result) -> Result + where + E: From, + { + let tx = Transaction { + inner: self.inner.transaction(), + store: &self, + }; + let r = f(&tx); + if let Err(e) = if r.is_err() { + tx.inner.rollback() + } else { + tx.inner.commit() + } { + return Err(E::from(Error::Internal(e))); + } + r + } + /// Construct and apply a batch of changes atomically. + pub fn apply(&self, f: impl FnOnce(&mut Batch)) -> Result<()> { + let mut batch = Batch { + inner: RefCell::new(WriteBatchWithTransaction::default()), + store: self.clone(), + }; + f(&mut batch); + self.inner.write(batch.inner.into_inner())?; + crate::OK + } + /// Open the data store in `state_dir`, and create one if it doesn't exist yet. + pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { + let mut db_opts = Options::default(); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + let tx_opts = TransactionDBOptions::default(); + let inner = Arc::new(Backend::open_cf( + &db_opts, + &tx_opts, + state_dir.as_ref().join(STORE_NAME), + schema.0, + )?); + Ok(Store { inner }) + } + /// Delete the main data store in `state_dir` if it exists. + pub fn nuke(state_dir: impl AsRef) -> Result<()> { + Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME)) + .map_err(Error::Internal) + } + /// Open a store that lives until `f` returns, for testing. + pub fn test(schema: Schema, f: impl FnOnce(Store) -> T) -> T { + let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir"); + f(Store::open(tmp_dir, schema) + .expect("failed to open temporary data store in {tmp_dir}")) + } + } + + /// Open a [`Store`]. Creates one if it doesn't exist yet at the state directory path. + pub fn open(state_dir: impl AsRef, schema: Schema) -> Result { + Store::open(state_dir, schema) + } + + pub use {alias::Alias, arrow::Arrow, mixin::Mixin}; + + use self::{ + cx::{Query, Write}, + types::Schema, + }; + + /// Hosts APIs for manipulating the data store. + /// + /// You can access these APIs from the body of the closure passed to [`Store::run`]. + pub struct Transaction<'db> { + inner: rocksdb::Transaction<'db, Backend>, + store: &'db Store, + } + + /// A set of writes that are to be executed atomically. + pub struct Batch { + inner: RefCell>, + store: Store, + } + + pub mod types { + //! Defining a [`Schema`]. + //! + //! There is a lot of complicated machinery here to make it so that you have to write very little code to + //! define new types. Basically, if you want to define a thing to store, you need to implement the trait + //! for it (e.g. [`Arrow`]), and also implement [`RecordType`], where you create a specification describing + //! which namespaces store records of that type. + //! + //! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able + //! to operate on the types. + //! + //! [`Arrow`]: super::Arrow + + // IDEA: Maybe this could be type-level :3 so that each store just has a type parameter for this. + + use std::{collections::HashSet, mem::ManuallyDrop}; + + use derive_more::Display; + + /// The namespace where all vertices must be registered. + pub(crate) const NODE_HEADERS: Namespace = Namespace("header:node"); + + /// The namespace where multiedge identities are mapped to endpoints. + pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); + + /// A specification of all user-defined namespaces. + pub struct Schema(pub(crate) HashSet); + + impl Schema { + /// Construct a new empty schema. + pub fn new() -> Schema { + Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS])) + } + /// Add the component to the schema. + pub fn add(mut self) -> Schema + where + C: RecordType, + { + self.add_mut(C::SPEC); + self + } + /// Add a spec to the schema by mutable reference. + pub fn add_mut(&mut self, spec: impl TypeSpec) -> &mut Schema { + spec.register(&mut self.0); + self + } + } + + /// The name of a keyspace. + /// + /// Specifically, this is the name of a RocksDB column family. + #[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)] + pub struct Namespace(&'static str); + + impl AsRef for Namespace { + fn as_ref(&self) -> &str { + self.0 + } + } + + /// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a + /// [mixin](MixinSpec). + pub trait RecordType { + type Spec: TypeSpec; + const SPEC: Self::Spec; + const NAME: &'static str; + } + + /// The specification for an [`Arrow`](super::Arrow). + /// + /// The listed namespaces must be unique among all other namespaces. + #[derive(Clone, Copy)] + pub struct ArrowSpec { + /// The keyspace where edge keys are ordered `(origin, target)`. + pub(crate) by_origin: Namespace, + /// The keyspace where edge keys are ordered `(target, origin)`. + pub(crate) by_target: Namespace, + } + + impl ArrowSpec { + /// Generate a spec based on the NAME. + pub const fn make() -> ArrowSpec + where + T: RecordType, + [(); T::NAME.len() + 12]: Sized, + { + use std::intrinsics::const_allocate; + // TODO: clean this up + #[allow(type_alias_bounds)] + type Arr = [u8; X::NAME.len() + 12]; + let tails = unsafe { + let start = const_allocate( + std::mem::size_of::>(), + std::mem::align_of::>(), + ); + + std::ptr::copy(b"arrow:".as_ptr(), start, 6); + + let ptr = start.add(6); + std::ptr::copy(T::NAME.as_ptr(), ptr, T::NAME.len()); + + let ptr = ptr.add(T::NAME.len()); + std::ptr::copy(b"/tails".as_ptr(), ptr, 6); + + std::str::from_raw_parts(start, T::NAME.len() + 12) + }; + let heads = unsafe { + let start = const_allocate( + std::mem::size_of::>(), + std::mem::align_of::>(), + ); + + std::ptr::copy(b"arrow:".as_ptr(), start, 6); + + let ptr = start.add(6); + std::ptr::copy(T::NAME.as_ptr(), ptr, T::NAME.len()); + + let ptr = ptr.add(T::NAME.len()); + std::ptr::copy(b"/heads".as_ptr(), ptr, 6); + + std::str::from_raw_parts(start, T::NAME.len() + 12) + }; + ArrowSpec { + by_origin: Namespace(tails), + by_target: Namespace(heads), + } + } + } + + #[derive(Clone, Copy)] + pub struct AliasSpec { + pub keyspace: Namespace, + pub reversed: Namespace, + } + + #[derive(Clone, Copy)] + pub struct MixinSpec { + pub keyspace: Namespace, + } + + /// Describes how to add a [`RecordType`] to a [`Schema`]. + pub trait TypeSpec { + /// Register the namespaces. + fn register(&self, set: &mut HashSet); + } + + impl TypeSpec for ArrowSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.by_origin) { + panic! { + "Duplicate found while inserting Arrow::BY_ORIGIN: {}", + self.by_origin + } + } + if !set.insert(self.by_target) { + panic! { + "Duplicate found while inserting Arrow::BY_TARGET: {}", + self.by_target + } + } + } + } + impl TypeSpec for AliasSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.keyspace) { + panic! { + "Duplicate found while inserting Alias::KEYSPACE: {}", + self.keyspace + } + } + if !set.insert(self.reversed) { + panic! { + "Duplicate found while inserting Alias::REVERSED: {}", + self.reversed + } + } + } + } + impl TypeSpec for MixinSpec { + fn register(&self, set: &mut HashSet) { + if !set.insert(self.keyspace) { + panic! { + "Duplicate found while inserting Mixin::KEYSPACE: {}", + self.keyspace + } + } + } + } + + impl Schema {} + } + + pub mod arrow { + //! Directed edges, both parallel and simple. + //! + //! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`]. + //! + //! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`], + //! and manipulating them can likewise be done from within the context of a `Transaction`. + //! + //! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a + //! few tricks to do with associated constants and types to make it all work nicely. + //! + //! # Terminology + //! + //! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes + //! can be seen as "things", and edges as connections between those things, defined by the two nodes that + //! they connect (which are called the *endpoints* of the edge). + //! + //! These edges can be directed or undirected. The difference is that undirected edges are identified by + //! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified + //! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and + //! the other is the *head* (usually called *target* here). + //! + //! # Arrow kinds + //! + //! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined + //! solely by which two nodes they connect, which means that their representation and certain operations + //! are more efficient. The trade-off is that they cannot capture more complex information than "this + //! edge exists". + //! + //! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple, + //! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they + //! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key, + //! they incur more space overhead, and most operations on them are more expensive compared to basic + //! edges. + //! + //! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which + //! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be + //! discernable from each other, each of them also has an `identity` key, in addition to listing the two + //! edges they connect. + //! + //! [mixins]: super::Mixin + + pub use self::kinds::{Basic, Multi}; + use super::{ + types::{ArrowSpec, RecordType}, + Batch, Store, Transaction, + }; + use crate::{Key, Result}; + + /// A directed edge. + /// + /// See the [module docs][self] for an introduction. + pub trait Arrow: RecordType { + /// The representation of this arrow, which also determines whether parallel edges are allowed. + type Kind: ArrowKind = Basic; + } + + /// Parameterizing arrows so we can distinguish between kinds of arrows. + /// + /// This lets us present a common API for certain arrow-related operations while also leveraging some + /// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at + /// the type level and at the value level whether that arrow is a multi-arrow or not. + pub trait ArrowKind { + /// Whether this kind of arrow should be represented using the specialized representation for edges + /// that are allowed to be parallel. + const IS_MULTI: bool; + /// Construct an arrow from a buffer containing a correctly-oriented arrow. + /// + /// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently + /// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the + /// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively. + /// + /// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target. + /// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity", + /// which is needed to discriminate between multiple parallel edges. + /// + /// # Failure + /// + /// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for + /// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the + /// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory + /// safety issue, so this function is safe. + fn dec(buf: &[u8]) -> Self; + /// Encode an arrow's key origin-first and target-first. + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>); + } + + impl Store { + /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. + pub fn exists(&self, origin: Key, target: Key) -> Result + where + A: Arrow, + { + op::exists::(self, origin, target) + } + /// Get all arrows of type `A` that point at `target`. + pub fn incoming<'a, A>( + &'a self, + target: Key, + ) -> impl Iterator> + 'a + where + A::Kind: 'a, + A: Arrow, + { + op::incoming::(self, target) + } + /// Get all arrows of type `A` that point away from `origin`. + pub fn outgoing<'a, A>( + &'a self, + origin: Key, + ) -> impl Iterator> + 'a + where + A::Kind: 'a, + A: Arrow, + { + op::outgoing::(self, origin) + } + } + + impl Transaction<'_> { + /// Check whether there exists any arrow of type `A` that points from `origin` to `target`. + /// + /// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges). + pub fn exists(&self, origin: Key, target: Key) -> Result + where + A: Arrow, + { + op::exists::(self, origin, target) + } + /// Get all arrows of type `A` that point at `target`. + pub fn incoming<'a, A>( + &'a self, + target: Key, + ) -> impl Iterator> + 'a + where + A::Kind: 'a, + A: Arrow, + { + op::incoming::(self, target) + } + /// Get all arrows of type `A` that point away from `origin`. + pub fn outgoing<'a, A>( + &'a self, + origin: Key, + ) -> impl Iterator> + 'a + where + A::Kind: 'a, + A: Arrow, + { + op::outgoing::(self, origin) + } + /// Create a new arrow of type `A`. + /// + /// This operation supports both [`Multi`] and [`Basic`] arrows. + /// + /// ```rust + /// # fn main () -> store::Result<()> { + /// use store::{new_interface::arrow::{Arrow, Multi}, Key}; + /// + /// enum MyMultiArrow {} + /// + /// impl Arrow for MyMultiArrow { + /// type Kind = Multi; + /// // ... + /// } + /// + /// # store::new_interface::Store::test(|db| { + /// let origin = Key::gen(); + /// let target = Key::gen(); + /// + /// db.run(|tx| { + /// tx.create::(Multi { + /// identity: Key::gen(), + /// origin, + /// target, + /// }) + /// })?; + /// + /// assert!(db.exists::(origin, target)?); + /// # store::OK }) + /// # } + /// ``` + pub fn create(&self, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + op::create::(self, arrow) + } + /// Delete all edges of type `A` from `origin` to `target`. + /// + /// It is not an error for this function not to delete anything. + pub fn delete_all(&self, origin: Key, target: Key) -> Result<()> + where + A: Arrow, + { + op::delete_all::(self, origin, target) + } + /// Delete a specific arrow. + pub fn delete_one(&self, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + op::delete_one::(self, arrow) + } + } + + impl Batch { + /// Create an arrow. See [`Transaction::create`]. + pub fn create(&mut self, arrow: A::Kind) + where + A: Arrow, + { + op::create::(self, arrow) + .expect("no errors expected to occur during batch operation") + } + /// Delete a specific arrow. + pub fn delete_one(&mut self, arrow: A::Kind) + where + A: Arrow, + { + op::delete_one::(self, arrow) + .expect("no errors expected to occur during batch operation") + } + } + + mod op { + //! Implementations of arrow operations. + + use super::{ + super::{Query, Write}, + *, + }; + use crate::{util::IterExt as _, Key, Result, OK}; + + /// Check whether there exists at least one arrow of type `A` from `origin` to `target`. + pub fn exists(cx: &impl Query, origin: Key, target: Key) -> Result + where + A: Arrow, + { + if A::Kind::IS_MULTI { + // In the case of a multi-edge, at least one result from the prefix scan + // indicates that there is at least one edge. + cx.open(A::SPEC.by_origin) + .scan(origin.fuse(target)) + .next() + .transpose() + .map(|o| o.is_some()) + } else { + cx.open(A::SPEC.by_origin).has(origin.fuse(target)) + } + } + + /// List incoming arrows relative to `target`. + pub fn incoming<'db, A>( + cx: &'db impl Query, + target: Key, + ) -> impl Iterator> + 'db + where + A: Arrow, + A::Kind: 'db, + { + // In the `by_target` keyspace, for either kind of arrow the layout is such that the target is + // the prefix, so we pick that keyspace to more efficiently list all arrows that target the key. + cx.open(A::SPEC.by_target) + .scan(target) + .map_ok(|(mut k, _)| { + // Arrows from `by_target` are oriented target-first, while the decoder function requires + // that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix + // or the whole slice, swapping the two keys always gives us the ordering expected by the + // decoding function. + let (t, o) = k[..32].split_at_mut(16); + t.swap_with_slice(o); + A::Kind::dec(&k) + }) + } + + /// List outgoing arrows relative to `origin`. + pub fn outgoing<'db, A>( + cx: &'db impl Query, + origin: Key, + ) -> impl Iterator> + 'db + where + A: Arrow, + A::Kind: 'db, + { + cx.open(A::SPEC.by_origin) + .scan(origin) + .map_ok(|(ref k, _)| A::Kind::dec(k)) + } + + /// Create a new arrow. + pub fn create(cx: &impl Write, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + let (by_origin, by_target) = arrow.enc(); + cx.open(A::SPEC.by_origin).set(by_origin, b"")?; + cx.open(A::SPEC.by_target).set(by_target, b"")?; + OK + } + + /// Delete all arrows from `origin` to `target`. + /// + /// TODO: Remove the query requirement (depends on range delete being available). + pub fn delete_all(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()> + where + A: Arrow, + { + let by_origin = cx.open(A::SPEC.by_origin); + let by_target = cx.open(A::SPEC.by_target); + Ok(if A::Kind::IS_MULTI { + // TODO: optimize this implementation using range deletes. + for key in by_origin.scan(origin.fuse(target)).keys() { + let key = Multi::decode(key?.as_ref()); + by_origin.del(key.encode())?; + by_target.del(key.swap().encode())?; + } + } else { + by_origin.del(origin.fuse(target))?; + by_target.del(target.fuse(origin))?; + }) + } + /// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist. + pub fn delete_one(cx: &impl Write, arrow: A::Kind) -> Result<()> + where + A: Arrow, + { + let (by_origin, by_target) = arrow.enc(); + cx.open(A::SPEC.by_origin).del(by_origin)?; + cx.open(A::SPEC.by_target).del(by_target)?; + OK + } + } + + /// Types representing the different kinds of arrows. + mod kinds { + use super::ArrowKind; + use crate::Key; + + impl ArrowKind for Multi { + const IS_MULTI: bool = true; + fn dec(buf: &[u8]) -> Self { + Multi::decode(buf) + } + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { + (self.encode(), self.swap().encode()) + } + } + + impl ArrowKind for Basic { + const IS_MULTI: bool = false; + fn dec(buf: &[u8]) -> Self { + Basic::decode(buf) + } + fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { + (self.encode(), self.reverse().encode()) + } + } + + /// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist + /// between two vertices. + #[derive(Clone, Copy)] + pub struct Multi { + /// The node that this edge points away from. + pub origin: Key, + /// The node that this edge points towards. + pub target: Key, + /// The discriminator of this particular edge, which distinguishes it from all other edges that + /// connect `origin` and `target`, and indeed from every other edge or node in the graph. + pub identity: Key, + } + + impl Multi { + /// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly + /// oriented, the results will be wrong; the arrow will be oriented *away* from the target and + /// *at* the origin, instead of the other way around. + /// + /// # Orientation + /// + /// In this context, *correctly oriented* means that it is laid out in *origin-first* order, + /// like this (where `o`, `t` and `i` represent bytes): + /// + /// ```text + /// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii + /// |--------------| |--------------| |--------------| + /// origin target identity + /// ..16 16..32 32.. + /// ``` + /// + /// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is + /// the prefix, followed by the origin, and then the identity. This is also called *target-first* + /// encoding in this documentation. + /// + /// # Silent failure + /// + /// There is no way to detect whether the ordering is correct from just the buffer, so the caller + /// must ensure that the order is correct. If you have a target-first encoded buffer, you can have + /// to swap the two keys before passing it into this function, or this function will give you an + /// edge that does not exist (since a multiedge can only point in one direction). + /// + /// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`. + /// + /// # Panics + /// + /// This function panics if `buf` is not exactly 48 bytes long. + pub fn decode(buf: &[u8]) -> Multi { + Multi { + origin: Key::from_slice(&buf[..16]), + target: Key::from_slice(&buf[16..32]), + identity: Key::from_slice(&buf[32..]), + } + } + /// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation + /// of the difference between origin-first encoding and target-first encoding. + pub fn encode(self) -> [u8; 48] { + let mut key = [0; 48]; + key[..16].copy_from_slice(&self.origin.0); + key[16..32].copy_from_slice(&self.target.0); + key[32..].copy_from_slice(&self.identity.0); + key + } + /// Swap the origin and target of this arrow, while leaving the identity the same. + pub(super) fn swap(self) -> Multi { + Multi { + origin: self.target, + target: self.origin, + ..self + } + } + } + + /// A normal directed edge. Duplicates are not allowed. + /// + /// This kind of arrow is useful for modeling predicates and simple relationships. + #[derive(Clone, Copy)] + pub struct Basic { + pub origin: Key, + pub target: Key, + } + + impl Basic { + /// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the + /// other direction). + pub fn reverse(self) -> Basic { + Basic { + origin: self.target, + target: self.origin, + } + } + /// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering. + pub fn encode(self) -> [u8; 32] { + self.origin.fuse(self.target) + } + /// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information + /// about key encoding. + /// + /// # Panics + /// + /// Panics if `buf` is not exactly 32 bytes long. + pub fn decode(buf: &[u8]) -> Basic { + let (origin, target) = Key::split(buf); + Basic { origin, target } + } + } + } + } + + mod mixin { + use bincode::{Decode, Encode}; + + use super::types::{MixinSpec, RecordType}; + + /// Mixins are the simplest pieces of data in the store. + pub trait Mixin: RecordType + Encode + Decode {} + } + + mod alias { + use super::types::{AliasSpec, RecordType}; + + /// An alternative unique identifier for a node. + pub trait Alias: RecordType {} + } + + /// An internal interface to a specific keyspace that exposes basic hashmap-esque operations + /// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a + /// [`Store`]. + struct Keyspace<'db, C> { + context: &'db C, + cf: Arc>, + } + + impl<'db, C> Keyspace<'db, C> + where + C: Query, + { + /// Fetch a value from the keyspace. + fn get(&self, key: impl AsRef<[u8]>) -> Result + 'db>> { + self.context.get_pinned(&self.cf, key) + } + /// Test whether a key exists. + fn has(&self, key: impl AsRef<[u8]>) -> Result { + self.get(key).map(|r| r.is_some()) + } + /// Execute a prefix scan. + fn scan( + &self, + prefix: impl AsRef<[u8]> + 'db, + ) -> impl Iterator, Box<[u8]>)>> + 'db { + let t = prefix.as_ref().to_vec(); + self.context + .prefix_iterator(&self.cf, prefix.as_ref()) + // The prefix iterator may "overshoot". This makes it stop when it reaches + // the end of the range that has the prefix. + .take_while(move |r| match r { + Ok((ref k, _)) => k.starts_with(&t), + _ => true, + }) + .map_err(Error::Internal) + } + /// List all pairs in the keyspace. + fn list(&self) -> impl Iterator, Box<[u8]>)>> + 'db { + self.context + .full_iterator(&self.cf, IteratorMode::Start) + .map_err(Error::Internal) + } + } + + impl Keyspace<'_, C> + where + C: Write, + { + fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { + self.context.put(&self.cf, key, val) + } + fn del(&self, key: impl AsRef<[u8]>) -> Result<()> { + self.context.delete(&self.cf, key) + } + } + + mod cx { + //! Contexts for doing reads, writes or both to the database. + //! + //! The traits in this module map abstract calls to their methods on the [rocksdb] objects. + + use rocksdb::{ + AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode, + }; + + use super::{Batch, Keyspace, Store, Transaction}; + use crate::{Backend, Error, Result, OK}; + + /// A context for executing database operations. + pub trait Context { + /// Open the keyspace identified by `cf`. + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> + where + Self: Sized; + } + + /// A context in which one can read from the data store. + /// + /// Specifically, this maps calls to either the transaction or the store's internals without us having + /// to implement methods for *both* transactions and the store. + pub trait Query: Context { + type Backend: DBAccess; + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>>; + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + } + + /// A context in which one can read from and modify the data store. + pub trait Write: Context { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>; + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()>; + } + + impl Context for Store { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { + Keyspace { + cf: self.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl Query for Store { + type Backend = Backend; + + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>> { + self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) + } + + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Backend> { + self.inner.prefix_iterator_cf(cf, prefix) + } + + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.full_iterator_cf(cf, mode) + } + } + + impl Context for Transaction<'_> { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> { + Keyspace { + cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl<'db> Query for Transaction<'db> { + type Backend = rocksdb::Transaction<'db, Backend>; + + fn get_pinned<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + ) -> Result>> { + self.inner.get_pinned_cf(cf, key).map_err(Error::Internal) + } + + fn prefix_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + prefix: &[u8], + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.prefix_iterator_cf(cf, prefix) + } + + fn full_iterator<'a>( + &'a self, + cf: &impl AsColumnFamilyRef, + mode: IteratorMode<'a>, + ) -> DBIteratorWithThreadMode<'a, Self::Backend> { + self.inner.full_iterator_cf(cf, mode) + } + } + + impl Write for Transaction<'_> { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { + self.inner.delete_cf(cf, key).map_err(Error::Internal) + } + + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()> { + self.inner.put_cf(cf, key, val).map_err(Error::Internal) + } + } + + impl Context for Batch { + fn open<'cx>(&'cx self, cf: impl AsRef) -> Keyspace<'cx, Self> + where + Self: Sized, + { + Keyspace { + cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(), + context: &self, + } + } + } + + impl Write for Batch { + fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> { + self.inner.borrow_mut().delete_cf(cf, key); + OK + } + + fn put( + &self, + cf: &impl AsColumnFamilyRef, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> Result<()> { + self.inner.borrow_mut().put_cf(cf, key, val); + OK + } + } + } +} diff --git a/lib/store/src/transaction.rs b/lib/store/src/transaction.rs index d8e479a..e7d1ca5 100644 --- a/lib/store/src/transaction.rs +++ b/lib/store/src/transaction.rs @@ -203,7 +203,7 @@ impl Transaction<'_> { impl Transaction<'_> { /// Find an arrow of type `A` with the given `tail` and `head`. - pub fn lookup_arrow(&self, (tail, head): (Key, Key)) -> Result> + pub fn lookup_arrow(&self, (tail, head): (Key, Key)) -> Result> where A: Arrow, { @@ -214,14 +214,14 @@ impl Transaction<'_> { Err(err) => Err(err), } } - /// Create a new arrow of type `A` and associate the label with it. + /// Create a simple arrow of type `A`. /// /// # Errors /// /// - `Error::Undefined` if either key is not registered - pub fn insert_arrow(&self, (tail, head): (Key, Key), label: A) -> Result<()> + pub fn insert_arrow(&self, (tail, head): (Key, Key)) -> Result<()> where - A: Arrow, + A: Arrow(&self, key: impl Keylike) -> impl Iterator> + '_ + pub fn list_incoming( + &self, + key: impl Keylike, + ) -> impl Iterator> + '_ where A: Arrow, { - self.list_arrows_where(Direction::Incoming, key) + self.list_arrows_where::(Direction::Incoming, key) } /// Get all arrows of type `A` "pointing away from" `key`. - pub fn list_outgoing(&self, key: impl Keylike) -> impl Iterator> + '_ + pub fn list_outgoing( + &self, + key: impl Keylike, + ) -> impl Iterator> + '_ where A: Arrow, { - self.list_arrows_where(Direction::Outgoing, key) + self.list_arrows_where::(Direction::Outgoing, key) } /// Get all arrows of type `A`. - pub fn list_arrows(&self) -> impl Iterator> + '_ + pub fn list_arrows(&self) -> impl Iterator> + '_ where A: Arrow, { @@ -280,7 +285,7 @@ impl Transaction<'_> { &self, direction: Direction, key: impl Keylike, - ) -> impl Iterator> + '_ + ) -> impl Iterator> + '_ where A: Arrow, { diff --git a/lib/store/src/transaction/tests.rs b/lib/store/src/transaction/tests.rs index ee0670a..9259ef7 100644 --- a/lib/store/src/transaction/tests.rs +++ b/lib/store/src/transaction/tests.rs @@ -34,7 +34,7 @@ fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) tx.create_vertex(target, TEST_TAG)?; tx.create_vertex(origin, TEST_TAG)?; - tx.insert_arrow((origin, target), TestArrow)?; + tx.insert_arrow::((origin, target))?; let l: Vec = tx .with("test-arrow/l") @@ -126,7 +126,7 @@ fn fanout() -> Result<()> { tx.create_vertex(origin, TEST_TAG)?; for t in targets { tx.create_vertex(t, TEST_TAG)?; - tx.insert_arrow((origin, t), TestArrow)?; + tx.insert_arrow::((origin, t))?; } let oo: Vec<_> = tx.list_outgoing::(origin).keys().try_collect()?; @@ -154,7 +154,7 @@ fn fanin() -> Result<()> { tx.create_vertex(target, TEST_TAG)?; for o in origins { tx.create_vertex(o, TEST_TAG)?; - tx.insert_arrow((o, target), TestArrow)?; + tx.insert_arrow::((o, target))?; } let ti: Vec<_> = tx.list_incoming::(target).keys().try_collect()?; @@ -185,7 +185,7 @@ fn distinct_many_to_many() -> Result<()> { for o in origins { tx.create_vertex(o, TEST_TAG)?; for t in targets { - tx.insert_arrow((o, t), TestArrow)?; + tx.insert_arrow::((o, t))?; } }