From 96805930ac35d921334f90cb4b9a5cf59e8c8377 Mon Sep 17 00:00:00 2001 From: Riley Apeldoorn Date: Wed, 24 Apr 2024 23:18:19 +0200 Subject: [PATCH] Polish the store APIs some more --- lib/macro/src/arrow.rs | 4 +- lib/macro/src/lib.rs | 6 +- lib/puppy/src/lib.rs | 34 +++++----- lib/store/src/arrow.rs | 34 +++++++++- lib/store/src/internal.rs | 40 +++++++++++- lib/store/src/key.rs | 2 + lib/store/src/mixin.rs | 132 ++++++++++++++++++++++++++++---------- lib/store/src/types.rs | 45 +++++++------ lib/store/src/util.rs | 8 +++ 9 files changed, 225 insertions(+), 80 deletions(-) diff --git a/lib/macro/src/arrow.rs b/lib/macro/src/arrow.rs index 359e020..34a8aad 100644 --- a/lib/macro/src/arrow.rs +++ b/lib/macro/src/arrow.rs @@ -117,8 +117,8 @@ fn gen_spec(name: &Ident) -> impl ToTokens { impl store::types::Value for #name { type Type = store::types::ArrowSpec; const SPEC: Self::Type = store::types::ArrowSpec { - by_origin: store::types::Namespace(#by_origin), - by_target: store::types::Namespace(#by_target), + by_origin: store::types::Keyspace(#by_origin), + by_target: store::types::Keyspace(#by_target), }; } } diff --git a/lib/macro/src/lib.rs b/lib/macro/src/lib.rs index 8566309..6bf0643 100644 --- a/lib/macro/src/lib.rs +++ b/lib/macro/src/lib.rs @@ -31,8 +31,8 @@ fn make_alias_impl(name: &syn::Ident, field: &syn::Field) -> TokenStream { impl store::types::Value for #name { type Type = store::types::AliasSpec; const SPEC: Self::Type = store::types::AliasSpec { - keyspace: store::types::Namespace(#keyspace), - reversed: store::types::Namespace(#reversed), + keyspace: store::types::Keyspace(#keyspace), + reversed: store::types::Keyspace(#reversed), }; } }; @@ -61,7 +61,7 @@ pub fn mixin(item: TokenStream) -> TokenStream { impl store::types::Value for #name { type Type = store::types::MixinSpec; const SPEC: Self::Type = store::types::MixinSpec { - keyspace: store::types::Namespace(#keyspace), + keyspace: store::types::Keyspace(#keyspace), }; } }; diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index ed8f6b6..e5689e0 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -1,13 +1,13 @@ -#![feature(iterator_try_collect)] +#![feature(iterator_try_collect, try_blocks)] use model::{AuthorOf, Bite, Content, Profile, Username}; -use store::util::{key, IterExt as _}; +use store::util::IterExt as _; pub use store::{self, Key, Store}; pub mod model { use bincode::{Decode, Encode}; - use store::{types::Schema, Key}; + use store::{types::Schema, Alias, Arrow, Key, Mixin}; - #[derive(store::Mixin, Encode, Decode)] + #[derive(Mixin, Encode, Decode)] pub struct Profile { pub post_count: usize, pub account_name: String, @@ -16,13 +16,13 @@ pub mod model { pub about_fields: Vec<(String, String)>, } - #[derive(store::Mixin, Encode, Decode)] + #[derive(Mixin, Encode, Decode)] pub struct Content { pub content: Option, pub summary: Option, } - #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct AuthorOf { #[origin] pub author: Key, @@ -30,7 +30,7 @@ pub mod model { pub object: Key, } - #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct Follows { #[origin] pub follower: Key, @@ -38,7 +38,7 @@ pub mod model { pub followed: Key, } - #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct Bite { #[identity] pub id: Key, @@ -48,7 +48,7 @@ pub mod model { pub victim: Key, } - #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] + #[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct FollowRequest { #[identity] pub id: Key, @@ -56,7 +56,7 @@ pub mod model { pub target: Key, } - #[derive(store::Alias)] + #[derive(Alias)] pub struct Username(pub String); /// Construct the schema. @@ -111,13 +111,13 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result { pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result> { db.run(|tx| { - tx.outgoing::(author) - .map_ok(|a| a.object) - .filter_bind_results(|post| { - let thing = tx.get_mixin(post)?; - Ok(thing.map(key(post))) - }) - .collect() + let keys = tx.outgoing::(author).map_ok(|a| a.object); + let posts = tx + .join_on(keys)? + .into_iter() + .filter_map(|(k, opt)| try { (k, opt?) }) + .collect(); + Ok(posts) }) } diff --git a/lib/store/src/arrow.rs b/lib/store/src/arrow.rs index 1c5105a..67b0095 100644 --- a/lib/store/src/arrow.rs +++ b/lib/store/src/arrow.rs @@ -38,6 +38,7 @@ //! edges they connect. //! //! [mixins]: super::Mixin +#![allow(private_interfaces)] pub use self::kinds::{Basic, Multi}; use super::{ @@ -82,6 +83,14 @@ pub trait ArrowKind { fn dec(buf: &[u8]) -> Self; /// Encode an arrow's key origin-first and target-first. fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>); + #[doc(hidden)] + /// Turn him into a raw edge. + fn raw(&self) -> Raw; +} + +union Raw { + multi: Multi, + basic: Basic, } impl Store { @@ -147,7 +156,8 @@ impl Transaction<'_> { /// #[derive(Arrow)] /// struct MyArrow { origin: Key, target: Key } /// - /// # store::new_interface::Store::test(|db| { + /// # let schema = store::types::Schema::new().has::(); + /// # store::Store::test(schema, |db| { /// let origin = Key::gen(); /// let target = Key::gen(); /// @@ -206,7 +216,7 @@ mod op { //! Implementations of arrow operations. use super::*; - use crate::{internal::*, Key, Result, OK}; + use crate::{internal::*, types::MULTIEDGE_HEADERS, Key, Result, OK}; /// Check whether there exists at least one arrow of type `A` from `origin` to `target`. pub fn exists(cx: &impl Query, origin: Key, target: Key) -> Result @@ -269,6 +279,15 @@ mod op { where A: Arrow, { + if A::Kind::IS_MULTI { + let Multi { + identity, + origin, + target, + } = unsafe { arrow.raw().multi }; + cx.open(MULTIEDGE_HEADERS) + .set(identity, origin.fuse(target))?; + } let (by_origin, by_target) = arrow.enc(); cx.open(A::SPEC.by_origin).set(by_origin, b"")?; cx.open(A::SPEC.by_target).set(by_target, b"")?; @@ -285,11 +304,14 @@ mod op { let by_origin = cx.open(A::SPEC.by_origin); let by_target = cx.open(A::SPEC.by_target); Ok(if A::Kind::IS_MULTI { + let headers = cx.open(MULTIEDGE_HEADERS); // TODO: optimize this implementation using range deletes. + // Unfortunately, range deletes are not available in transactional backends. for key in by_origin.scan(origin.fuse(target)).keys() { let key = Multi::decode(key?.as_ref()); by_origin.del(key.encode())?; by_target.del(key.swap().encode())?; + headers.del(key.identity)?; } } else { by_origin.del(origin.fuse(target))?; @@ -322,6 +344,9 @@ mod kinds { fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { (self.encode(), self.swap().encode()) } + fn raw(&self) -> super::Raw { + super::Raw { multi: *self } + } } impl ArrowKind for Basic { @@ -332,6 +357,9 @@ mod kinds { fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) { (self.encode(), self.reverse().encode()) } + fn raw(&self) -> super::Raw { + super::Raw { basic: *self } + } } /// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist @@ -472,7 +500,7 @@ mod kinds { /// Newtypes of either arrow kind are supported. /// /// ``` -/// use store::arrow::{Basic, Multi, Arrow}; +/// use store::{Key, arrow::{Basic, Multi, Arrow}}; /// /// /// The origin has requested to follow the target. /// /// diff --git a/lib/store/src/internal.rs b/lib/store/src/internal.rs index 5763e46..7680cef 100644 --- a/lib/store/src/internal.rs +++ b/lib/store/src/internal.rs @@ -55,6 +55,7 @@ where lower: [u8; N], upper: [u8; N], ) -> impl Iterator, Box<[u8]>)>> + 'db { + // TODO: use a seek op to make this more efficient self.context .full_iterator(&self.cf, IteratorMode::Start) .skip_while(move |r| match r { @@ -69,6 +70,16 @@ where }) .map_err(Error::Internal) } + /// Join all the keys to their values in this keyspace. + /// + /// This may be optimized compared to many random point lookups. + pub fn join( + &self, + keys: impl IntoIterator>, + ) -> Vec>>> { + self.context + .multi_get(keys.into_iter().map(|x| (&self.cf, x))) + } } impl Keyspace<'_, C> @@ -95,7 +106,7 @@ mod cx { }; use super::Keyspace; - use crate::{Backend, Batch, Error, Result, Store, Transaction, OK}; + use crate::{util::IterExt as _, Backend, Batch, Error, Result, Store, Transaction, OK}; /// A context for executing database operations. pub trait Context { @@ -126,6 +137,11 @@ mod cx { cf: &impl AsColumnFamilyRef, mode: IteratorMode<'a>, ) -> DBIteratorWithThreadMode<'a, Self::Backend>; + /// Optimized multi-point lookup. + fn multi_get<'a, C: AsColumnFamilyRef + 'a>( + &'a self, + keys: impl IntoIterator)>, + ) -> Vec>>>; } /// A context in which one can read from and modify the data store. @@ -174,6 +190,17 @@ mod cx { ) -> DBIteratorWithThreadMode<'a, Self::Backend> { self.inner.full_iterator_cf(cf, mode) } + + fn multi_get<'a, C: AsColumnFamilyRef + 'a>( + &'a self, + keys: impl IntoIterator)>, + ) -> Vec>>> { + self.inner + .multi_get_cf(keys) + .into_iter() + .map_err(Error::Internal) + .collect() + } } impl Context for Transaction<'_> { @@ -211,6 +238,17 @@ mod cx { ) -> DBIteratorWithThreadMode<'a, Self::Backend> { self.inner.full_iterator_cf(cf, mode) } + + fn multi_get<'a, C: AsColumnFamilyRef + 'a>( + &'a self, + keys: impl IntoIterator)>, + ) -> Vec>>> { + self.inner + .multi_get_cf(keys) + .into_iter() + .map_err(Error::Internal) + .collect() + } } impl Write for Transaction<'_> { diff --git a/lib/store/src/key.rs b/lib/store/src/key.rs index a4a2499..bf91394 100644 --- a/lib/store/src/key.rs +++ b/lib/store/src/key.rs @@ -3,6 +3,8 @@ use std::fmt::{Debug, Display}; use chrono::{DateTime, Utc}; use ulid::Ulid; +use crate::arrow::{ArrowKind, Basic, Multi}; + /// A unique identifier for vertices in the database. #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct Key(pub(crate) [u8; 16]); diff --git a/lib/store/src/mixin.rs b/lib/store/src/mixin.rs index 23943a7..6ef8fa8 100644 --- a/lib/store/src/mixin.rs +++ b/lib/store/src/mixin.rs @@ -1,23 +1,23 @@ -use std::{ - fmt::Pointer, - ops::{Bound, RangeBounds}, -}; +use std::ops::RangeBounds; use bincode::{Decode, Encode}; -use chrono::{DateTime, TimeDelta, Utc}; -use either::Either; -/// Derive a [`Mixin`] implementation. -pub use r#macro::Mixin; +use chrono::{DateTime, Utc}; use super::{ types::{MixinSpec, Value}, Batch, Store, Transaction, }; -use crate::{internal::Query, util::IterExt, Error, Key, Result}; +use crate::{Error, Key, Result}; /// Mixins are the simplest pieces of data in the store. pub trait Mixin: Value + Encode + Decode {} +/// Derive a [`Mixin`] implementation. +/// +/// In addition to deriving `Mixin`, you will need to derive or implement [`Encode`] +/// and [`Decode`]. +pub use r#macro::Mixin; + impl Store { /// Get the value! pub fn get_mixin(&self, node: Key) -> Result> @@ -33,6 +33,26 @@ impl Store { { op::has_mixin::(self, node) } + /// Get all `M`s where the key's timestamp is within the `range`. + pub fn range( + &self, + range: impl RangeBounds>, + ) -> impl Iterator> + '_ + where + M: Mixin, + { + op::get_range(self, range) + } + /// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results. + pub fn join_on( + &self, + iter: impl IntoIterator>, + ) -> Result)>> + where + M: Mixin, + { + op::join_on(self, iter) + } } impl Transaction<'_> { @@ -88,31 +108,17 @@ impl Transaction<'_> { where M: Mixin, { - use crate::internal::Context as _; - const MS: TimeDelta = TimeDelta::milliseconds(1); - let iter = match (range.start_bound(), range.end_bound()) { - (Bound::Unbounded, Bound::Unbounded) => { - Either::Left(self.open(M::SPEC.keyspace).list()) - } - (min, max) => { - let lower = match min { - Bound::Unbounded => [u8::MIN; 16], - Bound::Included(inc) => Key::range(*inc).0, - Bound::Excluded(exc) => Key::range(*exc + MS).0, - }; - let upper = match max { - Bound::Unbounded => [u8::MAX; 16], - Bound::Included(inc) => Key::range(*inc).1, - Bound::Excluded(exc) => Key::range(*exc - MS).1, - }; - Either::Right(self.open(M::SPEC.keyspace).range(lower, upper)) - } - }; - iter.bind_results(|(k, v)| { - let key = Key::from_slice(k.as_ref()); - let val = op::decode(v)?; - Ok((key, val)) - }) + op::get_range(self, range) + } + /// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results. + pub fn join_on( + &self, + iter: impl IntoIterator>, + ) -> Result)>> + where + M: Mixin, + { + op::join_on(self, iter) } } @@ -130,8 +136,13 @@ impl Batch { } mod op { + use std::ops::{Bound, RangeBounds}; + + use chrono::{DateTime, TimeDelta, Utc}; + use either::Either; + use super::Mixin; - use crate::{internal::*, Error, Key, Result}; + use crate::{internal::*, util::IterExt as _, Error, Key, Result}; pub fn update( cx: &(impl Query + Write), @@ -173,6 +184,57 @@ mod op { cx.open(M::SPEC.keyspace).has(node) } + pub fn get_range( + cx: &impl Query, + range: impl RangeBounds>, + ) -> impl Iterator> + '_ { + // TODO: Test this thoroughly + const MS: TimeDelta = TimeDelta::milliseconds(1); + let iter = match (range.start_bound(), range.end_bound()) { + (Bound::Unbounded, Bound::Unbounded) => Either::Left(cx.open(M::SPEC.keyspace).list()), + (min, max) => { + let lower = match min { + Bound::Unbounded => [u8::MIN; 16], + Bound::Included(inc) => Key::range(*inc).0, + Bound::Excluded(exc) => Key::range(*exc + MS).0, + }; + let upper = match max { + Bound::Unbounded => [u8::MAX; 16], + Bound::Included(inc) => Key::range(*inc).1, + Bound::Excluded(exc) => Key::range(*exc - MS).1, + }; + Either::Right(cx.open(M::SPEC.keyspace).range(lower, upper)) + } + }; + iter.bind_results(|(k, v)| { + let key = Key::from_slice(k.as_ref()); + let val = decode(v)?; + Ok((key, val)) + }) + } + + pub fn join_on( + cx: &impl Query, + iter: impl IntoIterator>, + ) -> Result)>> + where + M: Mixin, + { + let keys: Vec = iter.into_iter().try_collect()?; + cx.open(M::SPEC.keyspace) + .join(keys.iter()) + .into_iter() + .zip(keys) + .map(|(opt, key)| { + let Some(buf) = opt? else { + return Ok((key, None)); + }; + let val = decode(buf)?; + Ok((key, Some(val))) + }) + .try_collect() + } + pub(super) fn encode(data: impl bincode::Encode) -> Result> { bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) } diff --git a/lib/store/src/types.rs b/lib/store/src/types.rs index bcde887..9f482ca 100644 --- a/lib/store/src/types.rs +++ b/lib/store/src/types.rs @@ -15,17 +15,17 @@ use std::collections::HashSet; use derive_more::Display; /// The namespace where all vertices must be registered. -pub(crate) const NODE_HEADERS: Namespace = Namespace("header:node"); +pub(crate) const NODE_HEADERS: Keyspace = Keyspace("header:node"); /// The namespace where multiedge identities are mapped to endpoints. -pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); +pub(crate) const MULTIEDGE_HEADERS: Keyspace = Keyspace("header:multiedge"); /// A specification of all user-defined namespaces. /// /// # Example /// /// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then -/// testing whether it exists. +/// testing whether it exists. If the appropriate keyspaces are not known to the store, this will panic. /// /// ```rust /// use store::{ arrow::Arrow, types::Schema, Store, Key, OK }; @@ -39,7 +39,7 @@ pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); /// let schema = Schema::new() /// .has::(); /// -/// let result = Store::tmp(schema, |db| { +/// let result = Store::test(schema, |db| { /// let origin = Key::gen(); /// let target = Key::gen(); /// @@ -47,7 +47,7 @@ pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); /// changes.create(MyArrow { origin, target }); /// db.apply(changes)?; /// -/// db.exists::() +/// db.exists::(origin, target) /// })?; /// /// assert!(result); @@ -56,7 +56,7 @@ pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge"); /// ``` /// /// [basic arrow]: crate::arrow::Basic -pub struct Schema(pub(crate) HashSet); +pub struct Schema(pub(crate) HashSet); impl Schema { /// Construct a new empty schema. @@ -82,9 +82,9 @@ impl Schema { /// /// Specifically, this is the name of a RocksDB column family. #[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)] -pub struct Namespace(pub &'static str); +pub struct Keyspace(pub &'static str); -impl AsRef for Namespace { +impl AsRef for Keyspace { fn as_ref(&self) -> &str { self.0 } @@ -93,42 +93,49 @@ impl AsRef for Namespace { /// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a /// [mixin](MixinSpec). /// -/// All namespaces must be unique. +/// All namespaces must be unique, and added to the [`Schema`]. pub trait Value { type Type: TypeSpec; const SPEC: Self::Type; } -/// The specification for an [`Arrow`](super::Arrow). +/// The specification for an [`Arrow`](crate::Arrow). /// /// The listed namespaces must be unique among all other namespaces. #[derive(Clone, Copy)] pub struct ArrowSpec { /// The keyspace where edge keys are ordered `(origin, target)`. - pub by_origin: Namespace, + pub by_origin: Keyspace, /// The keyspace where edge keys are ordered `(target, origin)`. - pub by_target: Namespace, + pub by_target: Keyspace, } +/// A specification for the namespaces needed to store an [`Alias`][crate::Alias]. #[derive(Clone, Copy)] pub struct AliasSpec { - pub keyspace: Namespace, - pub reversed: Namespace, + /// The alias -> key mapping table. + pub keyspace: Keyspace, + /// The key -> alias mapping table. + pub reversed: Keyspace, } +/// Where do we store a mixin? #[derive(Clone, Copy)] pub struct MixinSpec { - pub keyspace: Namespace, + /// The key -> mixin mapping table. + pub keyspace: Keyspace, } /// Describes how to add a [`Value`] to a [`Schema`]. pub trait TypeSpec { /// Register the namespaces. - fn register(&self, set: &mut HashSet); + fn register(&self, set: &mut HashSet); } +// TODO: better error messages. + impl TypeSpec for ArrowSpec { - fn register(&self, set: &mut HashSet) { + fn register(&self, set: &mut HashSet) { if !set.insert(self.by_origin) { panic! { "Duplicate found while inserting Arrow::BY_ORIGIN: {}", @@ -144,7 +151,7 @@ impl TypeSpec for ArrowSpec { } } impl TypeSpec for AliasSpec { - fn register(&self, set: &mut HashSet) { + fn register(&self, set: &mut HashSet) { if !set.insert(self.keyspace) { panic! { "Duplicate found while inserting Alias::KEYSPACE: {}", @@ -160,7 +167,7 @@ impl TypeSpec for AliasSpec { } } impl TypeSpec for MixinSpec { - fn register(&self, set: &mut HashSet) { + fn register(&self, set: &mut HashSet) { if !set.insert(self.keyspace) { panic! { "Duplicate found while inserting Mixin::KEYSPACE: {}", diff --git a/lib/store/src/util.rs b/lib/store/src/util.rs index 8fac2e0..878c56d 100644 --- a/lib/store/src/util.rs +++ b/lib/store/src/util.rs @@ -60,6 +60,14 @@ pub trait IterExt: Iterator + Sized { impl IterExt for I where I: Iterator {} +/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next +/// argument is the second tuple element. pub fn key(key: K) -> impl FnOnce(V) -> (K, V) { move |val| (key, val) } + +/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next +/// argument is the second tuple element. +pub fn val(val: V) -> impl FnOnce(K) -> (K, V) { + move |key| (key, val) +}