Polish the store APIs some more

This commit is contained in:
Riley Apeldoorn 2024-04-24 23:18:19 +02:00
parent bcdd5e6059
commit 96805930ac
9 changed files with 225 additions and 80 deletions

View file

@ -117,8 +117,8 @@ fn gen_spec(name: &Ident) -> impl ToTokens {
impl store::types::Value for #name {
type Type = store::types::ArrowSpec;
const SPEC: Self::Type = store::types::ArrowSpec {
by_origin: store::types::Namespace(#by_origin),
by_target: store::types::Namespace(#by_target),
by_origin: store::types::Keyspace(#by_origin),
by_target: store::types::Keyspace(#by_target),
};
}
}

View file

@ -31,8 +31,8 @@ fn make_alias_impl(name: &syn::Ident, field: &syn::Field) -> TokenStream {
impl store::types::Value for #name {
type Type = store::types::AliasSpec;
const SPEC: Self::Type = store::types::AliasSpec {
keyspace: store::types::Namespace(#keyspace),
reversed: store::types::Namespace(#reversed),
keyspace: store::types::Keyspace(#keyspace),
reversed: store::types::Keyspace(#reversed),
};
}
};
@ -61,7 +61,7 @@ pub fn mixin(item: TokenStream) -> TokenStream {
impl store::types::Value for #name {
type Type = store::types::MixinSpec;
const SPEC: Self::Type = store::types::MixinSpec {
keyspace: store::types::Namespace(#keyspace),
keyspace: store::types::Keyspace(#keyspace),
};
}
};

View file

@ -1,13 +1,13 @@
#![feature(iterator_try_collect)]
#![feature(iterator_try_collect, try_blocks)]
use model::{AuthorOf, Bite, Content, Profile, Username};
use store::util::{key, IterExt as _};
use store::util::IterExt as _;
pub use store::{self, Key, Store};
pub mod model {
use bincode::{Decode, Encode};
use store::{types::Schema, Key};
use store::{types::Schema, Alias, Arrow, Key, Mixin};
#[derive(store::Mixin, Encode, Decode)]
#[derive(Mixin, Encode, Decode)]
pub struct Profile {
pub post_count: usize,
pub account_name: String,
@ -16,13 +16,13 @@ pub mod model {
pub about_fields: Vec<(String, String)>,
}
#[derive(store::Mixin, Encode, Decode)]
#[derive(Mixin, Encode, Decode)]
pub struct Content {
pub content: Option<String>,
pub summary: Option<String>,
}
#[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct AuthorOf {
#[origin]
pub author: Key,
@ -30,7 +30,7 @@ pub mod model {
pub object: Key,
}
#[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct Follows {
#[origin]
pub follower: Key,
@ -38,7 +38,7 @@ pub mod model {
pub followed: Key,
}
#[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct Bite {
#[identity]
pub id: Key,
@ -48,7 +48,7 @@ pub mod model {
pub victim: Key,
}
#[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct FollowRequest {
#[identity]
pub id: Key,
@ -56,7 +56,7 @@ pub mod model {
pub target: Key,
}
#[derive(store::Alias)]
#[derive(Alias)]
pub struct Username(pub String);
/// Construct the schema.
@ -111,13 +111,13 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result<Vec<(Key, Content)>> {
db.run(|tx| {
tx.outgoing::<AuthorOf>(author)
.map_ok(|a| a.object)
.filter_bind_results(|post| {
let thing = tx.get_mixin(post)?;
Ok(thing.map(key(post)))
})
.collect()
let keys = tx.outgoing::<AuthorOf>(author).map_ok(|a| a.object);
let posts = tx
.join_on(keys)?
.into_iter()
.filter_map(|(k, opt)| try { (k, opt?) })
.collect();
Ok(posts)
})
}

View file

@ -38,6 +38,7 @@
//! edges they connect.
//!
//! [mixins]: super::Mixin
#![allow(private_interfaces)]
pub use self::kinds::{Basic, Multi};
use super::{
@ -82,6 +83,14 @@ pub trait ArrowKind {
fn dec(buf: &[u8]) -> Self;
/// Encode an arrow's key origin-first and target-first.
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>);
#[doc(hidden)]
/// Turn him into a raw edge.
fn raw(&self) -> Raw;
}
union Raw {
multi: Multi,
basic: Basic,
}
impl Store {
@ -147,7 +156,8 @@ impl Transaction<'_> {
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// # store::new_interface::Store::test(|db| {
/// # let schema = store::types::Schema::new().has::<MyArrow>();
/// # store::Store::test(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
@ -206,7 +216,7 @@ mod op {
//! Implementations of arrow operations.
use super::*;
use crate::{internal::*, Key, Result, OK};
use crate::{internal::*, types::MULTIEDGE_HEADERS, Key, Result, OK};
/// Check whether there exists at least one arrow of type `A` from `origin` to `target`.
pub fn exists<A>(cx: &impl Query, origin: Key, target: Key) -> Result<bool>
@ -269,6 +279,15 @@ mod op {
where
A: Arrow,
{
if A::Kind::IS_MULTI {
let Multi {
identity,
origin,
target,
} = unsafe { arrow.raw().multi };
cx.open(MULTIEDGE_HEADERS)
.set(identity, origin.fuse(target))?;
}
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).set(by_origin, b"")?;
cx.open(A::SPEC.by_target).set(by_target, b"")?;
@ -285,11 +304,14 @@ mod op {
let by_origin = cx.open(A::SPEC.by_origin);
let by_target = cx.open(A::SPEC.by_target);
Ok(if A::Kind::IS_MULTI {
let headers = cx.open(MULTIEDGE_HEADERS);
// TODO: optimize this implementation using range deletes.
// Unfortunately, range deletes are not available in transactional backends.
for key in by_origin.scan(origin.fuse(target)).keys() {
let key = Multi::decode(key?.as_ref());
by_origin.del(key.encode())?;
by_target.del(key.swap().encode())?;
headers.del(key.identity)?;
}
} else {
by_origin.del(origin.fuse(target))?;
@ -322,6 +344,9 @@ mod kinds {
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
(self.encode(), self.swap().encode())
}
fn raw(&self) -> super::Raw {
super::Raw { multi: *self }
}
}
impl ArrowKind for Basic {
@ -332,6 +357,9 @@ mod kinds {
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
(self.encode(), self.reverse().encode())
}
fn raw(&self) -> super::Raw {
super::Raw { basic: *self }
}
}
/// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist
@ -472,7 +500,7 @@ mod kinds {
/// Newtypes of either arrow kind are supported.
///
/// ```
/// use store::arrow::{Basic, Multi, Arrow};
/// use store::{Key, arrow::{Basic, Multi, Arrow}};
///
/// /// The origin has requested to follow the target.
/// ///

View file

@ -55,6 +55,7 @@ where
lower: [u8; N],
upper: [u8; N],
) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
// TODO: use a seek op to make this more efficient
self.context
.full_iterator(&self.cf, IteratorMode::Start)
.skip_while(move |r| match r {
@ -69,6 +70,16 @@ where
})
.map_err(Error::Internal)
}
/// Join all the keys to their values in this keyspace.
///
/// This may be optimized compared to many random point lookups.
pub fn join(
&self,
keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.context
.multi_get(keys.into_iter().map(|x| (&self.cf, x)))
}
}
impl<C> Keyspace<'_, C>
@ -95,7 +106,7 @@ mod cx {
};
use super::Keyspace;
use crate::{Backend, Batch, Error, Result, Store, Transaction, OK};
use crate::{util::IterExt as _, Backend, Batch, Error, Result, Store, Transaction, OK};
/// A context for executing database operations.
pub trait Context {
@ -126,6 +137,11 @@ mod cx {
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
/// Optimized multi-point lookup.
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>>;
}
/// A context in which one can read from and modify the data store.
@ -174,6 +190,17 @@ mod cx {
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.inner
.multi_get_cf(keys)
.into_iter()
.map_err(Error::Internal)
.collect()
}
}
impl Context for Transaction<'_> {
@ -211,6 +238,17 @@ mod cx {
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.inner
.multi_get_cf(keys)
.into_iter()
.map_err(Error::Internal)
.collect()
}
}
impl Write for Transaction<'_> {

View file

@ -3,6 +3,8 @@ use std::fmt::{Debug, Display};
use chrono::{DateTime, Utc};
use ulid::Ulid;
use crate::arrow::{ArrowKind, Basic, Multi};
/// A unique identifier for vertices in the database.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Key(pub(crate) [u8; 16]);

View file

@ -1,23 +1,23 @@
use std::{
fmt::Pointer,
ops::{Bound, RangeBounds},
};
use std::ops::RangeBounds;
use bincode::{Decode, Encode};
use chrono::{DateTime, TimeDelta, Utc};
use either::Either;
/// Derive a [`Mixin`] implementation.
pub use r#macro::Mixin;
use chrono::{DateTime, Utc};
use super::{
types::{MixinSpec, Value},
Batch, Store, Transaction,
};
use crate::{internal::Query, util::IterExt, Error, Key, Result};
use crate::{Error, Key, Result};
/// Mixins are the simplest pieces of data in the store.
pub trait Mixin: Value<Type = MixinSpec> + Encode + Decode {}
/// Derive a [`Mixin`] implementation.
///
/// In addition to deriving `Mixin`, you will need to derive or implement [`Encode`]
/// and [`Decode`].
pub use r#macro::Mixin;
impl Store {
/// Get the value!
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
@ -33,6 +33,26 @@ impl Store {
{
op::has_mixin::<M>(self, node)
}
/// Get all `M`s where the key's timestamp is within the `range`.
pub fn range<M>(
&self,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
op::get_range(self, range)
}
/// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results.
pub fn join_on<M>(
&self,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
op::join_on(self, iter)
}
}
impl Transaction<'_> {
@ -88,31 +108,17 @@ impl Transaction<'_> {
where
M: Mixin,
{
use crate::internal::Context as _;
const MS: TimeDelta = TimeDelta::milliseconds(1);
let iter = match (range.start_bound(), range.end_bound()) {
(Bound::Unbounded, Bound::Unbounded) => {
Either::Left(self.open(M::SPEC.keyspace).list())
op::get_range(self, range)
}
(min, max) => {
let lower = match min {
Bound::Unbounded => [u8::MIN; 16],
Bound::Included(inc) => Key::range(*inc).0,
Bound::Excluded(exc) => Key::range(*exc + MS).0,
};
let upper = match max {
Bound::Unbounded => [u8::MAX; 16],
Bound::Included(inc) => Key::range(*inc).1,
Bound::Excluded(exc) => Key::range(*exc - MS).1,
};
Either::Right(self.open(M::SPEC.keyspace).range(lower, upper))
}
};
iter.bind_results(|(k, v)| {
let key = Key::from_slice(k.as_ref());
let val = op::decode(v)?;
Ok((key, val))
})
/// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results.
pub fn join_on<M>(
&self,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
op::join_on(self, iter)
}
}
@ -130,8 +136,13 @@ impl Batch {
}
mod op {
use std::ops::{Bound, RangeBounds};
use chrono::{DateTime, TimeDelta, Utc};
use either::Either;
use super::Mixin;
use crate::{internal::*, Error, Key, Result};
use crate::{internal::*, util::IterExt as _, Error, Key, Result};
pub fn update<M>(
cx: &(impl Query + Write),
@ -173,6 +184,57 @@ mod op {
cx.open(M::SPEC.keyspace).has(node)
}
pub fn get_range<M: Mixin>(
cx: &impl Query,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_ {
// TODO: Test this thoroughly
const MS: TimeDelta = TimeDelta::milliseconds(1);
let iter = match (range.start_bound(), range.end_bound()) {
(Bound::Unbounded, Bound::Unbounded) => Either::Left(cx.open(M::SPEC.keyspace).list()),
(min, max) => {
let lower = match min {
Bound::Unbounded => [u8::MIN; 16],
Bound::Included(inc) => Key::range(*inc).0,
Bound::Excluded(exc) => Key::range(*exc + MS).0,
};
let upper = match max {
Bound::Unbounded => [u8::MAX; 16],
Bound::Included(inc) => Key::range(*inc).1,
Bound::Excluded(exc) => Key::range(*exc - MS).1,
};
Either::Right(cx.open(M::SPEC.keyspace).range(lower, upper))
}
};
iter.bind_results(|(k, v)| {
let key = Key::from_slice(k.as_ref());
let val = decode(v)?;
Ok((key, val))
})
}
pub fn join_on<M>(
cx: &impl Query,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
let keys: Vec<Key> = iter.into_iter().try_collect()?;
cx.open(M::SPEC.keyspace)
.join(keys.iter())
.into_iter()
.zip(keys)
.map(|(opt, key)| {
let Some(buf) = opt? else {
return Ok((key, None));
};
let val = decode(buf)?;
Ok((key, Some(val)))
})
.try_collect()
}
pub(super) fn encode(data: impl bincode::Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}

View file

@ -15,17 +15,17 @@ use std::collections::HashSet;
use derive_more::Display;
/// The namespace where all vertices must be registered.
pub(crate) const NODE_HEADERS: Namespace = Namespace("header:node");
pub(crate) const NODE_HEADERS: Keyspace = Keyspace("header:node");
/// The namespace where multiedge identities are mapped to endpoints.
pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge");
pub(crate) const MULTIEDGE_HEADERS: Keyspace = Keyspace("header:multiedge");
/// A specification of all user-defined namespaces.
///
/// # Example
///
/// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then
/// testing whether it exists.
/// testing whether it exists. If the appropriate keyspaces are not known to the store, this will panic.
///
/// ```rust
/// use store::{ arrow::Arrow, types::Schema, Store, Key, OK };
@ -39,7 +39,7 @@ pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge");
/// let schema = Schema::new()
/// .has::<MyArrow>();
///
/// let result = Store::tmp(schema, |db| {
/// let result = Store::test(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
@ -47,7 +47,7 @@ pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge");
/// changes.create(MyArrow { origin, target });
/// db.apply(changes)?;
///
/// db.exists::<MyArrow>()
/// db.exists::<MyArrow>(origin, target)
/// })?;
///
/// assert!(result);
@ -56,7 +56,7 @@ pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge");
/// ```
///
/// [basic arrow]: crate::arrow::Basic
pub struct Schema(pub(crate) HashSet<Namespace>);
pub struct Schema(pub(crate) HashSet<Keyspace>);
impl Schema {
/// Construct a new empty schema.
@ -82,9 +82,9 @@ impl Schema {
///
/// Specifically, this is the name of a RocksDB column family.
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
pub struct Namespace(pub &'static str);
pub struct Keyspace(pub &'static str);
impl AsRef<str> for Namespace {
impl AsRef<str> for Keyspace {
fn as_ref(&self) -> &str {
self.0
}
@ -93,42 +93,49 @@ impl AsRef<str> for Namespace {
/// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a
/// [mixin](MixinSpec).
///
/// All namespaces must be unique.
/// All namespaces must be unique, and added to the [`Schema`].
pub trait Value {
type Type: TypeSpec;
const SPEC: Self::Type;
}
/// The specification for an [`Arrow`](super::Arrow).
/// The specification for an [`Arrow`](crate::Arrow).
///
/// The listed namespaces must be unique among all other namespaces.
#[derive(Clone, Copy)]
pub struct ArrowSpec {
/// The keyspace where edge keys are ordered `(origin, target)`.
pub by_origin: Namespace,
pub by_origin: Keyspace,
/// The keyspace where edge keys are ordered `(target, origin)`.
pub by_target: Namespace,
pub by_target: Keyspace,
}
/// A specification for the namespaces needed to store an [`Alias`][crate::Alias].
#[derive(Clone, Copy)]
pub struct AliasSpec {
pub keyspace: Namespace,
pub reversed: Namespace,
/// The alias -> key mapping table.
pub keyspace: Keyspace,
/// The key -> alias mapping table.
pub reversed: Keyspace,
}
/// Where do we store a mixin?
#[derive(Clone, Copy)]
pub struct MixinSpec {
pub keyspace: Namespace,
/// The key -> mixin mapping table.
pub keyspace: Keyspace,
}
/// Describes how to add a [`Value`] to a [`Schema`].
pub trait TypeSpec {
/// Register the namespaces.
fn register(&self, set: &mut HashSet<Namespace>);
fn register(&self, set: &mut HashSet<Keyspace>);
}
// TODO: better error messages.
impl TypeSpec for ArrowSpec {
fn register(&self, set: &mut HashSet<Namespace>) {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.by_origin) {
panic! {
"Duplicate found while inserting Arrow::BY_ORIGIN: {}",
@ -144,7 +151,7 @@ impl TypeSpec for ArrowSpec {
}
}
impl TypeSpec for AliasSpec {
fn register(&self, set: &mut HashSet<Namespace>) {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Alias::KEYSPACE: {}",
@ -160,7 +167,7 @@ impl TypeSpec for AliasSpec {
}
}
impl TypeSpec for MixinSpec {
fn register(&self, set: &mut HashSet<Namespace>) {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Mixin::KEYSPACE: {}",

View file

@ -60,6 +60,14 @@ pub trait IterExt: Iterator + Sized {
impl<I> IterExt for I where I: Iterator {}
/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next
/// argument is the second tuple element.
pub fn key<K, V>(key: K) -> impl FnOnce(V) -> (K, V) {
move |val| (key, val)
}
/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next
/// argument is the second tuple element.
pub fn val<K, V>(val: V) -> impl FnOnce(K) -> (K, V) {
move |key| (key, val)
}