Store api overhaul

This commit is contained in:
Riley Apeldoorn 2024-04-23 00:52:39 +02:00
parent 523e9a7479
commit 29f90ad918
19 changed files with 1918 additions and 1049 deletions

14
Cargo.lock generated
View file

@ -885,6 +885,16 @@ dependencies = [
"libc",
]
[[package]]
name = "macro"
version = "0.0.0"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -1130,6 +1140,8 @@ dependencies = [
name = "puppy"
version = "0.0.0"
dependencies = [
"bincode",
"chrono",
"fetch",
"store",
]
@ -1490,6 +1502,8 @@ dependencies = [
"bincode",
"chrono",
"derive_more",
"either",
"macro",
"rocksdb",
"tempfile",
"ulid",

View file

@ -3,6 +3,7 @@ members = [
"lib/puppy",
"lib/store",
"lib/fetch",
"lib/macro",
"bin/server",
"bin/pupctl",
]

View file

@ -1,31 +1,26 @@
use puppy::{
store::{
self,
alias::Username,
arrow::{FollowRequested, Follows},
mixin::Profile,
Error,
},
model::{schema, Bite, FollowRequest, Follows, Profile, Username},
store::{self, Error},
tl::Post,
Bite, Key, Store,
Key, Store,
};
fn main() -> store::Result<()> {
// Store::nuke(".state")?;
let db = Store::open(".state")?;
let db = Store::open(".state", schema())?;
println!("creating actors");
let riley = get_or_create_actor(&db, "riley")?;
let linen = get_or_create_actor(&db, "linen")?;
if false {
if true {
println!("creating posts");
puppy::create_post(&db, riley, "@linen <3")?;
puppy::create_post(&db, linen, "@riley <3")?;
}
if false {
if true {
println!("making riley follow linen");
if !db.exists::<Follows>((riley, linen))? {
if !db.exists::<Follows>(riley, linen)? {
println!("follow relation does not exist yet");
if !db.exists::<FollowRequested>((riley, linen))? {
if !db.exists::<FollowRequest>(riley, linen)? {
println!("no pending follow request; creating");
puppy::fr::create(&db, riley, linen)?;
} else {
@ -36,44 +31,46 @@ fn main() -> store::Result<()> {
println!("riley already follows linen");
}
}
println!("Posts on the instance:");
println!("\nPosts on the instance:");
for Post {
id,
content,
author,
} in puppy::tl::fetch_all(&db)?
{
let (_, Profile { account_name, .. }) = db.lookup(author)?;
let Profile { account_name, .. } = db.get_mixin(author)?.unwrap();
let content = content.content.unwrap();
println!("- {id} by @{account_name} ({author}):\n{content}",)
}
println!("Linen's followers:");
println!("\nLinen's followers:");
for id in puppy::fr::followers_of(&db, linen)? {
let (_, Profile { account_name, .. }) = db.lookup(id)?;
let Profile { account_name, .. } = db.get_mixin(id)?.unwrap();
println!("- @{account_name} ({id})");
}
println!("Riley's following:");
println!("\nRiley's following:");
for id in puppy::fr::following_of(&db, riley)? {
let (_, Profile { account_name, .. }) = db.lookup(id)?;
let Profile { account_name, .. } = db.get_mixin(id)?.unwrap();
println!("- @{account_name} ({id})");
}
if false {
println!("Biting riley");
puppy::bite_actor(&db, linen, riley).unwrap();
for Bite { id, biter, .. } in puppy::bites_on(&db, riley).unwrap() {
let (_, Profile { account_name, .. }) = db.lookup(biter).unwrap();
let Profile { account_name, .. } = db.get_mixin(biter)?.unwrap();
println!("riley was bitten by @{account_name} at {}", id.timestamp());
}
}
store::OK
}
fn get_or_create_actor(db: &Store, username: &str) -> Result<Key, Error> {
let user = db.translate::<Username>(username);
let user = db.lookup(Username(username.to_string()));
match user {
Ok(key) => {
Ok(Some(key)) => {
println!("found '{username}' ({key})");
Ok(key)
}
Err(Error::Missing) => {
Ok(None) => {
println!("'{username}' doesn't exist yet, creating");
let r = puppy::create_actor(&db, username);
if let Ok(ref key) = r {

13
lib/macro/Cargo.toml Normal file
View file

@ -0,0 +1,13 @@
[package]
name = "macro"
edition = "2021"
[lib]
path = "src/lib.rs"
proc-macro = true
[dependencies]
syn = { version = '2', features = ['full'] }
quote = '*'
proc-macro2 = '*'
heck = '*'

142
lib/macro/src/arrow.rs Normal file
View file

@ -0,0 +1,142 @@
use heck::AsKebabCase;
use proc_macro::TokenStream;
use quote::{quote, ToTokens};
use syn::{parse_macro_input, Data, DeriveInput, Field, Ident};
pub fn arrow(item: TokenStream) -> TokenStream {
let input = parse_macro_input!(item as DeriveInput);
let Data::Struct(structure) = input.data else {
panic!("Only structs are supported as arrows")
};
match structure.fields {
syn::Fields::Named(fields) => from_named(&input.ident, fields),
syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => {
let first = f.unnamed.first().unwrap();
from_newtype(&input.ident, first)
}
_ => panic!(
"Only newtype structs and structs with named fields can have a derived arrow impl"
),
}
}
fn from_named(name: &Ident, fields: syn::FieldsNamed) -> TokenStream {
let (origin, target, identity) = extract_idents(fields);
match identity {
Some(id) => make_multi_arrow(name, origin, target, id),
None => make_basic_arrow(name, origin, target),
}
}
fn make_basic_arrow(name: &Ident, origin: Ident, target: Ident) -> TokenStream {
let spec = gen_spec(name);
TokenStream::from(quote! {
#spec
impl store::arrow::Arrow for #name {}
impl From<store::arrow::Basic> for #name {
fn from(v: store::arrow::Basic) -> #name {
#name {
#origin: v.origin,
#target: v.target,
}
}
}
impl From<#name> for store::arrow::Basic {
fn from(v: #name) -> store::arrow::Basic {
store::arrow::Basic {
origin: v.#origin,
target: v.#target,
}
}
}
})
}
fn make_multi_arrow(name: &Ident, origin: Ident, target: Ident, id: Ident) -> TokenStream {
let spec = gen_spec(name);
TokenStream::from(quote! {
#spec
impl store::arrow::Arrow for #name {
type Kind = store::arrow::Multi;
}
impl From<store::arrow::Multi> for #name {
fn from(v: store::arrow::Multi) -> #name {
#name {
#id: v.identity,
#origin: v.origin,
#target: v.target,
}
}
}
impl From<#name> for store::arrow::Multi {
fn from(v: #name) -> store::arrow::Multi {
store::arrow::Multi {
identity: v.#id,
origin: v.#origin,
target: v.#target,
}
}
}
})
}
fn extract_idents(fields: syn::FieldsNamed) -> (Ident, Ident, Option<Ident>) {
let origin = extract_ident("origin", &fields).unwrap();
let target = extract_ident("target", &fields).unwrap();
let id = extract_ident("identity", &fields);
(origin, target, id)
}
fn extract_ident(name: &str, fields: &syn::FieldsNamed) -> Option<Ident> {
// Prefer marked fields and default to correctly named fields.
fields
.named
.iter()
.find(|field| {
field
.attrs
.iter()
.filter_map(|attr| attr.meta.path().get_ident())
.any(|id| id == name)
})
.and_then(|f| f.ident.clone())
.or_else(|| {
fields
.named
.iter()
.filter_map(|f| f.ident.clone())
.find(|id| id == name)
})
}
fn gen_spec(name: &Ident) -> impl ToTokens {
let prefix = AsKebabCase(name.to_string());
let by_origin = format!("{prefix}/by-origin");
let by_target = format!("{prefix}/by-target");
quote! {
impl store::types::Value for #name {
type Type = store::types::ArrowSpec;
const SPEC: Self::Type = store::types::ArrowSpec {
by_origin: store::types::Keyspace(#by_origin),
by_target: store::types::Keyspace(#by_target),
};
}
}
}
fn from_newtype(name: &Ident, field: &Field) -> TokenStream {
let spec = gen_spec(name);
let typ = &field.ty;
TokenStream::from(quote! {
#spec
impl store::arrow::Arrow for #name {
type Kind = #typ;
}
impl From<#typ> for #name {
fn from(v: #typ) -> #name { #name(v) }
}
impl From<#name> for #typ {
fn from(v: #name) -> #typ { v.0 }
}
})
}

73
lib/macro/src/lib.rs Normal file
View file

@ -0,0 +1,73 @@
use proc_macro::TokenStream;
mod arrow;
#[proc_macro_derive(Arrow, attributes(origin, target, identity))]
pub fn arrow(item: TokenStream) -> TokenStream {
arrow::arrow(item)
}
#[proc_macro_derive(Alias)]
pub fn alias(item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::DeriveInput);
let syn::Data::Struct(structure) = input.data else {
panic!("Only structs are supported as aliases")
};
match structure.fields {
syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => {
let first = f.unnamed.first().unwrap();
make_alias_impl(&input.ident, first)
}
_ => panic!("Only string newtype structs are allowed as aliases"),
}
}
fn make_alias_impl(name: &syn::Ident, field: &syn::Field) -> TokenStream {
let typ = &field.ty;
let prefix = heck::AsKebabCase(name.to_string());
let keyspace = format!("{prefix}/keyspace");
let reversed = format!("{prefix}/reversed");
let spec = quote::quote! {
impl store::types::Value for #name {
type Type = store::types::AliasSpec;
const SPEC: Self::Type = store::types::AliasSpec {
keyspace: store::types::Keyspace(#keyspace),
reversed: store::types::Keyspace(#reversed),
};
}
};
TokenStream::from(quote::quote! {
#spec
impl store::Alias for #name {}
impl AsRef<str> for #name {
fn as_ref(&self) -> &str { self.0.as_ref() }
}
impl From<#typ> for #name {
fn from(v: #typ) -> #name { #name(v) }
}
})
}
#[proc_macro_derive(Mixin)]
pub fn mixin(item: TokenStream) -> TokenStream {
let input = syn::parse_macro_input!(item as syn::DeriveInput);
let name = input.ident;
let prefix = heck::AsKebabCase(name.to_string());
let keyspace = format!("{prefix}/main");
let spec = quote::quote! {
impl store::types::Value for #name {
type Type = store::types::MixinSpec;
const SPEC: Self::Type = store::types::MixinSpec {
keyspace: store::types::Keyspace(#keyspace),
};
}
};
TokenStream::from(quote::quote! {
#spec
impl store::Mixin for #name {}
})
}

View file

@ -8,3 +8,5 @@ path = "src/lib.rs"
[dependencies]
store = { path = "../store" }
fetch = { path = "../fetch" }
bincode = "2.0.0-rc.3"
chrono = "*"

View file

@ -1,46 +1,104 @@
#![feature(iterator_try_collect)]
#![feature(iterator_try_collect, try_blocks)]
use model::{AuthorOf, Bite, Content, Profile, Username};
use store::util::IterExt as _;
pub use store::{self, Key, Store};
use store::{
alias::Username,
arrow::{self, multi::MultiArrow, AuthorOf},
mixin::{Content, Profile},
util::IterExt,
Keylike, Tag,
};
mod tags {
//! Type tags for vertices.
pub mod model {
use bincode::{Decode, Encode};
use store::{types::Schema, Alias, Arrow, Key, Mixin};
use store::Tag;
#[derive(Mixin, Encode, Decode)]
pub struct Profile {
pub post_count: usize,
pub account_name: String,
pub display_name: Option<String>,
pub about_string: Option<String>,
pub about_fields: Vec<(String, String)>,
}
pub const ACTOR: Tag = Tag(0);
pub const POST: Tag = Tag(1);
pub const BITE: Tag = Tag(2);
#[derive(Mixin, Encode, Decode)]
pub struct Content {
pub content: Option<String>,
pub summary: Option<String>,
}
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct AuthorOf {
#[origin]
pub author: Key,
#[target]
pub object: Key,
}
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct Follows {
#[origin]
pub follower: Key,
#[target]
pub followed: Key,
}
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct Bite {
#[identity]
pub id: Key,
#[origin]
pub biter: Key,
#[target]
pub victim: Key,
}
#[derive(Arrow, Debug, PartialEq, Eq, Clone, Copy)]
pub struct FollowRequest {
#[identity]
pub id: Key,
pub origin: Key,
pub target: Key,
}
#[derive(Alias)]
pub struct Username(pub String);
/// Construct the schema.
pub fn schema() -> Schema {
Schema::new()
// Mixins
.has::<Profile>()
.has::<Content>()
// Aliases
.has::<Username>()
// Arrows
.has::<Bite>()
.has::<FollowRequest>()
.has::<AuthorOf>()
.has::<Follows>()
}
}
pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result<Key> {
let key = Key::gen();
db.transaction(|tx| {
tx.create_vertex(key, tags::POST)?;
tx.update::<Profile>(author, |_, mut profile| {
db.run(|tx| {
tx.update::<Profile>(author, |mut profile| {
profile.post_count += 1;
Ok(profile)
profile
})?;
tx.insert(key, Content {
tx.add_mixin(key, Content {
content: Some(content.to_string()),
summary: None,
})?;
tx.insert_arrow((author, key), AuthorOf)?;
tx.create(AuthorOf {
author,
object: key,
})?;
Ok(key)
})
}
pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
let key = Key::gen();
db.transaction(|tx| {
tx.create_vertex(key, tags::ACTOR)?;
tx.insert_alias(key, Username(username.to_string()))?;
tx.insert(key, Profile {
db.run(|tx| {
tx.add_alias(key, Username(username.to_string()))?;
tx.add_mixin(key, Profile {
post_count: 0,
account_name: username.to_string(),
display_name: None,
@ -51,49 +109,36 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
})
}
pub fn list_posts_by_author(
db: &Store,
author: impl Keylike,
) -> store::Result<Vec<(Key, Content)>> {
db.transaction(|tx| {
tx.list_outgoing::<AuthorOf>(author)
.bind_results(|(post_key, _)| tx.lookup::<Content>(post_key))
.collect()
pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result<Vec<(Key, Content)>> {
db.run(|tx| {
let keys = tx.outgoing::<AuthorOf>(author).map_ok(|a| a.object);
let posts = tx
.join_on(keys)?
.into_iter()
.filter_map(|(k, opt)| try { (k, opt?) })
.collect();
Ok(posts)
})
}
pub struct Bite {
pub id: Key,
pub biter: Key,
pub victim: Key,
}
impl MultiArrow for Bite {
const TYPE: Tag = tags::BITE;
}
pub fn bite_actor(db: &Store, biter: Key, victim: Key) -> store::Result<Key> {
db.transaction(|tx| {
// Bites are represented as multiedges.
let key = arrow::multi::insert::<Bite>(&tx, biter, victim)?;
// We can treat particular arrows in a quiver as a vertex by registering it.
tx.create_vertex(key, tags::BITE)?;
Ok(key)
db.run(|tx| {
let id = Key::gen();
tx.create(Bite { id, biter, victim })?;
Ok(id)
})
}
pub fn bites_on(db: &Store, victim: Key) -> store::Result<Vec<Bite>> {
db.transaction(|tx| {
arrow::multi::list_incoming::<Bite>(&tx, victim)
.map_ok(|(biter, id)| Bite { id, biter, victim })
.try_collect()
})
db.incoming::<Bite>(victim).try_collect()
}
pub mod tl {
//! Timelines
use store::{arrow::AuthorOf, mixin::Content, util::IterExt as _, Error, Key, Result, Store};
use store::{util::IterExt as _, Error, Key, Result, Store};
use crate::model::{AuthorOf, Content};
pub struct Post {
pub id: Key,
@ -102,13 +147,11 @@ pub mod tl {
}
pub fn fetch_all(db: &Store) -> Result<Vec<Post>> {
db.transaction(|tx| {
let iter = tx.list::<Content>();
db.run(|tx| {
let iter = tx.range::<Content>(..);
iter.bind_results(|(id, content)| {
let author = tx
.list_incoming::<AuthorOf>(id)
.keys()
.next_or(Error::Missing)?;
let AuthorOf { author, .. } =
tx.incoming::<AuthorOf>(id).next_or(Error::Missing)?;
Ok(Post {
id,
author,
@ -123,54 +166,64 @@ pub mod tl {
pub mod fr {
//! Follow requests
use store::{
arrow::{FollowRequested, Follows},
util::IterExt as _,
Key, Store, OK,
};
use store::{util::IterExt as _, Key, Store, OK};
pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.insert_arrow((requester, target), FollowRequested)?;
OK
use crate::model::{FollowRequest, Follows};
pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<FollowRequest> {
db.run(|tx| {
let req = FollowRequest {
id: Key::gen(),
origin: requester,
target,
};
tx.create(req)?;
Ok(req)
})
}
pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?;
tx.insert_arrow((requester, target), Follows)?;
db.run(|tx| {
tx.delete_all::<FollowRequest>(requester, target)?;
tx.create(Follows {
follower: requester,
followed: target,
})?;
OK
})
}
pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?;
db.run(|tx| {
tx.delete_all::<FollowRequest>(requester, target)?;
OK
})
}
pub fn list_pending(db: &Store, target: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| tx.list_incoming::<FollowRequested>(target).keys().collect())
pub fn list_pending(db: &Store, target: Key) -> store::Result<Vec<FollowRequest>> {
db.incoming::<FollowRequest>(target).collect()
}
pub fn following_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| tx.list_outgoing::<Follows>(actor).keys().collect())
db.outgoing::<Follows>(actor)
.map_ok(|a| a.followed)
.collect()
}
pub fn followers_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| tx.list_incoming::<Follows>(actor).keys().collect())
db.incoming::<Follows>(actor)
.map_ok(|a| a.follower)
.collect()
}
#[cfg(test)]
mod tests {
use store::{
arrow::{FollowRequested, Follows},
Key, Store, OK,
};
use store::{Key, Store, OK};
use crate::create_actor;
use crate::{
create_actor,
model::{schema, FollowRequest, Follows},
};
fn make_test_actors(db: &Store) -> store::Result<(Key, Key)> {
let alice = create_actor(&db, "alice")?;
@ -181,18 +234,21 @@ pub mod fr {
#[test]
fn create_fr() -> store::Result<()> {
Store::with_tmp(|db| {
Store::test(schema(), |db| {
let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?;
assert!(
db.exists::<FollowRequested>((alice, bob))?,
db.exists::<FollowRequest>(alice, bob)?,
"(alice -> bob) ∈ follow-requested"
);
assert!(
!db.exists::<Follows>((alice, bob))?,
!db.exists::<Follows>(alice, bob)?,
"(alice -> bob) ∉ follows"
);
let pending_for_bob = super::list_pending(&db, bob)?;
let pending_for_bob = super::list_pending(&db, bob)?
.into_iter()
.map(|fr| fr.origin)
.collect::<Vec<_>>();
assert_eq!(pending_for_bob, vec![alice], "bob.pending = {{alice}}");
OK
})
@ -200,17 +256,17 @@ pub mod fr {
#[test]
fn accept_fr() -> store::Result<()> {
Store::with_tmp(|db| {
Store::test(schema(), |db| {
let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?;
super::accept(&db, alice, bob)?;
assert!(
db.exists::<Follows>((alice, bob))?,
db.exists::<Follows>(alice, bob)?,
"(alice -> bob) ∈ follows"
);
assert!(
!db.exists::<Follows>((bob, alice))?,
!db.exists::<Follows>(bob, alice)?,
"(bob -> alice) ∉ follows"
);
@ -226,7 +282,7 @@ pub mod fr {
#[test]
fn listing_follow_relations() -> store::Result<()> {
Store::with_tmp(|db| {
Store::test(schema(), |db| {
let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?;
super::accept(&db, alice, bob)?;

View file

@ -12,3 +12,5 @@ derive_more = "*"
bincode = "2.0.0-rc.3"
chrono = "*"
tempfile = "*"
macro = { path = "../macro" }
either = "*"

View file

@ -1,17 +1,115 @@
//! Alternative keys.
/// Derive an implementation of [`Alias`].
pub use r#macro::Alias;
use derive_more::{Display, From};
use super::{
types::{AliasSpec, Value},
Batch, Store, Transaction,
};
use crate::{Key, Result};
use crate::Space;
/// An alternative unique identifier for a node.
pub trait Alias: Value<Type = AliasSpec> + From<String> + AsRef<str> {}
/// An alternative unique key for a vertex.
pub trait Alias: ToString + From<String> {
const SPACE: (Space, Space);
impl Transaction<'_> {
/// Look up the key associated with the alias.
pub fn lookup<A>(&self, alias: A) -> Result<Option<Key>>
where
A: Alias,
{
op::lookup::<A>(self, alias.as_ref())
}
/// Get the alias associated with the `node`.
pub fn get_alias<A>(&self, node: Key) -> Result<Option<A>>
where
A: Alias,
{
op::get_alias(self, node)
}
/// Add an alias to `node`.
pub fn add_alias<A>(&self, node: Key, alias: A) -> Result<()>
where
A: Alias,
{
op::add_alias::<A>(self, node, alias.as_ref())
}
/// Check whether `node` has an `M` defined for it.
pub fn has_alias<A>(&self, node: Key) -> Result<bool>
where
A: Alias,
{
op::has_alias::<A>(self, node)
}
}
#[derive(Display, From)]
pub struct Username(pub String);
impl Alias for Username {
const SPACE: (Space, Space) = (Space("username/l"), Space("username/r"));
impl Store {
/// Look up the key associated with the alias.
pub fn lookup<A>(&self, alias: A) -> Result<Option<Key>>
where
A: Alias,
{
op::lookup::<A>(self, alias.as_ref())
}
/// Get the alias associated with the `node`.
pub fn get_alias<A>(&self, node: Key) -> Result<Option<A>>
where
A: Alias,
{
op::get_alias(self, node)
}
/// Check whether `node` has an `M` defined for it.
pub fn has_alias<A>(&self, node: Key) -> Result<bool>
where
A: Alias,
{
op::has_alias::<A>(self, node)
}
}
impl Batch {
/// Add an alias to `node`.
///
/// # Warning
///
/// This will *not* fail if the key already has a alias of this type, and in fact *it will cause fundamental inconsistency*
/// if the alias already exists. Don't call this function unless you know that neither `node` nor `alias` exist yet.
pub fn put_alias<A>(&mut self, node: Key, alias: A)
where
A: Alias,
{
// TODO: consistency *could* be checked by manually iterating over the transaction using `WriteBatch::iterate`
op::add_alias::<A>(self, node, alias.as_ref()).unwrap();
}
}
mod op {
use crate::{internal::*, Alias, Key, Result, OK};
pub fn lookup<A: Alias>(cx: &impl Query, alias: &str) -> Result<Option<Key>> {
cx.open(A::SPEC.keyspace).get(alias).map(|k| match k {
Some(x) => Some(Key::from_slice(x.as_ref())),
None => None,
})
}
pub fn has_alias<A: Alias>(cx: &impl Query, node: Key) -> Result<bool> {
cx.open(A::SPEC.reversed).has(node)
}
pub fn add_alias<A: Alias>(cx: &impl Write, node: Key, alias: &str) -> Result<()> {
cx.open(A::SPEC.keyspace).set(alias, node)?;
cx.open(A::SPEC.reversed).set(node, alias)?;
OK
}
pub fn get_alias<A: Alias>(cx: &impl Query, node: Key) -> Result<Option<A>> {
let buf = cx.open(A::SPEC.reversed).get(node)?;
Ok(buf.map(decode))
}
fn decode<T>(data: impl AsRef<[u8]>) -> T
where
T: From<String>,
{
T::from(String::from_utf8_lossy(data.as_ref()).into_owned())
}
}

View file

@ -1,79 +1,557 @@
//! Relations between nodes.
use bincode::{Decode, Encode};
use crate::Space;
pub mod multi {
//! Managing multiedges.
//! Directed edges, both parallel and simple.
//!
//! Unlike regular [`Arrow`]s, which don't have an identity (they are identified by the two nodes that
//! they connect), multiarrows can have their own [`Key`]. This allows one to have multiple arrows in
//! the same direction connecting the same two vertices, which isn't possible with normal arrows.
//! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`].
//!
//! Multiarrows can also be treated as if they were vertices, if their identity (`Key`) is registered as
//! one.
//! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`],
//! and manipulating them can likewise be done from within the context of a `Transaction`.
//!
//! This comes with a trade-off, though, specifically in both space and complexity. A multi-arrow also
//! can't have a label, like a typical arrow.
//! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a
//! few tricks to do with associated constants and types to make it all work nicely.
//!
//! # Terminology
//!
//! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes
//! can be seen as "things", and edges as connections between those things, defined by the two nodes that
//! they connect (which are called the *endpoints* of the edge).
//!
//! These edges can be directed or undirected. The difference is that undirected edges are identified by
//! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified
//! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and
//! the other is the *head* (usually called *target* here).
//!
//! # Arrow kinds
//!
//! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined
//! solely by which two nodes they connect, which means that their representation and certain operations
//! are more efficient. The trade-off is that they cannot capture more complex information than "this
//! edge exists".
//!
//! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple,
//! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they
//! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key,
//! they incur more space overhead, and most operations on them are more expensive compared to basic
//! edges.
//!
//! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which
//! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be
//! discernable from each other, each of them also has an `identity` key, in addition to listing the two
//! edges they connect.
//!
//! [mixins]: super::Mixin
#![allow(private_interfaces)]
use crate::{Key, Result, Tag, Transaction};
pub use self::kinds::{Basic, Multi};
use super::{
types::{ArrowSpec, Value},
Batch, Store, Transaction,
};
use crate::{util::IterExt as _, Key, Result};
pub fn insert<A>(tx: &Transaction<'_>, origin: Key, target: Key) -> Result<Key>
where
A: MultiArrow,
{
let key = Key::gen();
tx.quiver(A::TYPE).insert(origin, target, key)?;
Ok(key)
/// A directed edge.
///
/// See the [module docs][self] for an introduction.
pub trait Arrow: Value<Type = ArrowSpec> + From<Self::Kind> + Into<Self::Kind> {
/// The representation of this arrow, which also determines whether parallel edges are allowed.
type Kind: ArrowKind = Basic;
}
pub fn list_incoming<'db, A>(
tx: &'db Transaction<'db>,
/// Parameterizing arrows so we can distinguish between kinds of arrows.
///
/// This lets us present a common API for certain arrow-related operations while also leveraging some
/// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at
/// the type level and at the value level whether that arrow is a multi-arrow or not.
pub trait ArrowKind {
/// Whether this kind of arrow should be represented using the specialized representation for edges
/// that are allowed to be parallel.
const IS_MULTI: bool;
/// Construct an arrow from a buffer containing a correctly-oriented arrow.
///
/// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently
/// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the
/// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively.
///
/// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target.
/// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity",
/// which is needed to discriminate between multiple parallel edges.
///
/// # Failure
///
/// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for
/// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the
/// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory
/// safety issue, so this function is safe.
fn dec(buf: &[u8]) -> Self;
/// Encode an arrow's key origin-first and target-first.
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>);
#[doc(hidden)]
/// Turn him into a raw edge.
fn raw(&self) -> Raw;
}
union Raw {
multi: Multi,
basic: Basic,
}
impl Store {
/// Check whether there exists any arrow of type `A` that points from `origin` to `target`.
pub fn exists<A>(&self, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
op::exists::<A>(self, origin, target)
}
/// Get all arrows of type `A` that point at `target`.
pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::incoming::<A>(self, target).map_ok(A::from)
}
/// Get all arrows of type `A` that point away from `origin`.
pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::outgoing::<A>(self, origin).map_ok(A::from)
}
}
impl Transaction<'_> {
/// Check whether there exists any arrow of type `A` that points from `origin` to `target`.
///
/// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges).
pub fn exists<A>(&self, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
op::exists::<A>(self, origin, target)
}
/// Get all arrows of type `A` that point at `target`.
pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::incoming::<A>(self, target).map_ok(A::from)
}
/// Get all arrows of type `A` that point away from `origin`.
pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::outgoing::<A>(self, origin).map_ok(A::from)
}
/// Create a new arrow of type `A`.
///
/// This operation supports both [`Multi`] and [`Basic`] arrows.
///
/// # Example
///
/// The following snippet creates an arrow between `origin` and `target`.
///
/// ```rust
/// # fn main () -> store::Result<()> {
/// use store::{Arrow, Key};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// # let schema = store::types::Schema::new().has::<MyArrow>();
/// # store::Store::test(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// db.run(|tx| {
/// tx.create(MyArrow { origin, target })
/// })?;
///
/// assert!(db.exists::<MyArrow>(origin, target)?);
/// # store::OK })
/// # }
/// ```
pub fn create<A>(&self, arrow: A) -> Result<()>
where
A: Arrow,
{
op::create::<A>(self, arrow.into())
}
/// Delete all edges of type `A` from `origin` to `target`.
///
/// It is not an error for this function not to delete anything.
pub fn delete_all<A>(&self, origin: Key, target: Key) -> Result<()>
where
A: Arrow,
{
op::delete_all::<A>(self, origin, target)
}
/// Delete a specific arrow.
pub fn delete_one<A>(&self, arrow: A) -> Result<()>
where
A: Arrow,
{
op::delete_one::<A>(self, arrow.into())
}
}
impl Batch {
/// Create an arrow. See [`Transaction::create`].
pub fn create<A>(&mut self, arrow: A)
where
A: Arrow,
{
op::create::<A>(self, arrow.into())
.expect("no errors expected to occur during batch operation")
}
/// Delete a specific arrow.
pub fn delete_one<A>(&mut self, arrow: A)
where
A: Arrow,
{
op::delete_one::<A>(self, arrow.into())
.expect("no errors expected to occur during batch operation")
}
}
mod op {
//! Implementations of arrow operations.
use super::*;
use crate::{internal::*, types::MULTIEDGE_HEADERS, Key, Result, OK};
/// Check whether there exists at least one arrow of type `A` from `origin` to `target`.
pub fn exists<A>(cx: &impl Query, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
if A::Kind::IS_MULTI {
// In the case of a multi-edge, at least one result from the prefix scan
// indicates that there is at least one edge.
cx.open(A::SPEC.by_origin)
.scan(origin.fuse(target))
.next()
.transpose()
.map(|o| o.is_some())
} else {
cx.open(A::SPEC.by_origin).has(origin.fuse(target))
}
}
/// List incoming arrows relative to `target`.
pub fn incoming<'db, A>(
cx: &'db impl Query,
target: Key,
) -> impl Iterator<Item = Result<(Key, Key)>> + 'db
) -> impl Iterator<Item = Result<A::Kind>> + 'db
where
A: MultiArrow,
A: Arrow,
A::Kind: 'db,
{
tx.quiver(A::TYPE).list_incoming(target)
// In the `by_target` keyspace, for either kind of arrow the layout is such that the target is
// the prefix, so we pick that keyspace to more efficiently list all arrows that target the key.
cx.open(A::SPEC.by_target)
.scan(target)
.map_ok(|(mut k, _)| {
// Arrows from `by_target` are oriented target-first, while the decoder function requires
// that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix
// or the whole slice, swapping the two keys always gives us the ordering expected by the
// decoding function.
let (t, o) = k[..32].split_at_mut(16);
t.swap_with_slice(o);
A::Kind::dec(&k)
})
}
pub trait MultiArrow {
const TYPE: Tag;
/// List outgoing arrows relative to `origin`.
pub fn outgoing<'db, A>(
cx: &'db impl Query,
origin: Key,
) -> impl Iterator<Item = Result<A::Kind>> + 'db
where
A: Arrow,
A::Kind: 'db,
{
cx.open(A::SPEC.by_origin)
.scan(origin)
.map_ok(|(ref k, _)| A::Kind::dec(k))
}
/// Create a new arrow.
pub fn create<A>(cx: &impl Write, arrow: A::Kind) -> Result<()>
where
A: Arrow,
{
if A::Kind::IS_MULTI {
let Multi {
identity,
origin,
target,
} = unsafe { arrow.raw().multi };
cx.open(MULTIEDGE_HEADERS)
.set(identity, origin.fuse(target))?;
}
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).set(by_origin, b"")?;
cx.open(A::SPEC.by_target).set(by_target, b"")?;
OK
}
/// Delete all arrows from `origin` to `target`.
///
/// TODO: Remove the query requirement (depends on range delete being available).
pub fn delete_all<A>(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()>
where
A: Arrow,
{
let by_origin = cx.open(A::SPEC.by_origin);
let by_target = cx.open(A::SPEC.by_target);
Ok(if A::Kind::IS_MULTI {
let headers = cx.open(MULTIEDGE_HEADERS);
// TODO: optimize this implementation using range deletes.
// Unfortunately, range deletes are not available in transactional backends.
for key in by_origin.scan(origin.fuse(target)).keys() {
let key = Multi::decode(key?.as_ref());
by_origin.del(key.encode())?;
by_target.del(key.swap().encode())?;
headers.del(key.identity)?;
}
} else {
by_origin.del(origin.fuse(target))?;
by_target.del(target.fuse(origin))?;
})
}
/// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist.
pub fn delete_one<A>(cx: &impl Write, arrow: A::Kind) -> Result<()>
where
A: Arrow,
{
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).del(by_origin)?;
cx.open(A::SPEC.by_target).del(by_target)?;
OK
}
}
/// A directed edge between two vertices.
pub trait Arrow: Encode + Decode {
const SPACE: (Space, Space);
/// Types representing the different kinds of arrows.
mod kinds {
use super::ArrowKind;
use crate::Key;
impl ArrowKind for Multi {
const IS_MULTI: bool = true;
fn dec(buf: &[u8]) -> Self {
Multi::decode(buf)
}
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
(self.encode(), self.swap().encode())
}
fn raw(&self) -> super::Raw {
super::Raw { multi: *self }
}
}
/// Which way an arrow is pointing when viewed from a particular vertex.
pub enum Direction {
Incoming,
Outgoing,
impl ArrowKind for Basic {
const IS_MULTI: bool = false;
fn dec(buf: &[u8]) -> Self {
Basic::decode(buf)
}
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
(self.encode(), self.reverse().encode())
}
fn raw(&self) -> super::Raw {
super::Raw { basic: *self }
}
}
/// The node this arrow points away from is the "author" of the node the arrow points to.
#[derive(Encode, Decode)]
pub struct AuthorOf;
impl Arrow for AuthorOf {
const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r"));
/// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist
/// between two vertices.
#[derive(Clone, Copy)]
pub struct Multi {
/// The node that this edge points away from.
pub origin: Key,
/// The node that this edge points towards.
pub target: Key,
/// The discriminator of this particular edge, which distinguishes it from all other edges that
/// connect `origin` and `target`, and indeed from every other edge or node in the graph.
pub identity: Key,
}
/// The origin of this arrow has follow requested the target.
#[derive(Encode, Decode)]
pub struct FollowRequested;
impl Arrow for FollowRequested {
const SPACE: (Space, Space) = (Space("pending-fr/l"), Space("pending-fr/r"));
impl Multi {
/// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly
/// oriented, the results will be wrong; the arrow will be oriented *away* from the target and
/// *at* the origin, instead of the other way around.
///
/// # Orientation
///
/// In this context, *correctly oriented* means that it is laid out in *origin-first* order,
/// like this (where `o`, `t` and `i` represent bytes):
///
/// ```text
/// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii
/// |--------------| |--------------| |--------------|
/// origin target identity
/// ..16 16..32 32..
/// ```
///
/// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is
/// the prefix, followed by the origin, and then the identity. This is also called *target-first*
/// encoding in this documentation.
///
/// # Silent failure
///
/// There is no way to detect whether the ordering is correct from just the buffer, so the caller
/// must ensure that the order is correct. If you have a target-first encoded buffer, you can have
/// to swap the two keys before passing it into this function, or this function will give you an
/// edge that does not exist (since a multiedge can only point in one direction).
///
/// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`.
///
/// # Panics
///
/// This function panics if `buf` is not exactly 48 bytes long.
pub fn decode(buf: &[u8]) -> Multi {
Multi {
origin: Key::from_slice(&buf[..16]),
target: Key::from_slice(&buf[16..32]),
identity: Key::from_slice(&buf[32..]),
}
}
/// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation
/// of the difference between origin-first encoding and target-first encoding.
pub fn encode(self) -> [u8; 48] {
let mut key = [0; 48];
key[..16].copy_from_slice(&self.origin.0);
key[16..32].copy_from_slice(&self.target.0);
key[32..].copy_from_slice(&self.identity.0);
key
}
/// Swap the origin and target of this arrow, while leaving the identity the same.
pub(super) fn swap(self) -> Multi {
Multi {
origin: self.target,
target: self.origin,
..self
}
}
}
/// The origin "follows" the target.
#[derive(Encode, Decode)]
pub struct Follows;
impl Arrow for Follows {
const SPACE: (Space, Space) = (Space("follows/l"), Space("follows/r"));
/// A normal directed edge. Duplicates are not allowed.
///
/// This kind of arrow is useful for modeling predicates and simple relationships.
#[derive(Clone, Copy)]
pub struct Basic {
pub origin: Key,
pub target: Key,
}
impl Basic {
/// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the
/// other direction).
pub fn reverse(self) -> Basic {
Basic {
origin: self.target,
target: self.origin,
}
}
/// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering.
pub fn encode(self) -> [u8; 32] {
self.origin.fuse(self.target)
}
/// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information
/// about key encoding.
///
/// # Panics
///
/// Panics if `buf` is not exactly 32 bytes long.
pub fn decode(buf: &[u8]) -> Basic {
let (origin, target) = Key::split(buf);
Basic { origin, target }
}
}
}
/// Derive [`Arrow`] for a struct.
///
/// This will generate the required [`Into`] and [`From`] impls, as well as an [`Arrow`](trait@Arrow) impl and
/// a [`Value`] impl with the namespaces derived from the name of the struct. The macro works on structs with
/// specific fields, or newtypes of any arrow kind.
///
/// # Attributes
///
/// The `origin`, `target` and `identity` attributes are used on fields of type [`Key`], and they are used
/// to map the arrow's type to an [`ArrowKind`]. The `#[origin]` annotation isn't needed if the struct contains
/// a field named `origin`. Ditto with `target` and `identity`.
///
/// If there is no `identity` defined, the `ArrowKind` will be [`Basic`]. If an `identity` is defined, the kind
/// will be [`Multi`].
///
/// # Examples
///
/// Generates a [`Basic`] arrow called `my-arrow`.
///
/// ```
/// use store::{Key, Arrow, types::Schema};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// // This will fail to compile if the type doesn't implement `Arrow` correctly
/// Schema::new().has::<MyArrow>();
/// ```
///
/// Newtypes of either arrow kind are supported.
///
/// ```
/// use store::{Key, arrow::{Basic, Multi, Arrow}};
///
/// /// The origin has requested to follow the target.
/// ///
/// /// Note: there may be more than one follow request between any two actors.
/// #[derive(Arrow)]
/// struct FollowRequest(Multi);
///
/// /// A relation between two actors meaning that the origin follows the target.
/// #[derive(Arrow)]
/// struct Follows(Basic);
///
/// /// Users can follow each other.
/// struct User(Key);
///
/// impl User {
/// /// Make `self` follow `other`.
/// pub fn follows(self, other: User) -> Follows {
/// Follows(Basic { origin: self.0, target: other.0 })
/// }
/// }
/// ```
///
/// Generates a [`Multi`] arrow called `my-multi-arrow`, mapping the multiarrow's discriminator to the struct's
/// `unique` field.
///
/// ```
/// use store::{Key, Arrow};
///
/// #[derive(Arrow)]
/// struct MyMultiArrow {
/// pub origin: Key,
/// pub target: Key,
/// #[identity]
/// pub unique: Key,
/// }
/// ```
///
/// The macro automatically adds `From` and `Into` implementations:
///
/// ```
/// use store::{Key, Arrow, arrow::Basic};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// let edge: Basic = MyArrow { origin, target }.into();
///
/// assert_eq!(origin, edge.origin);
/// assert_eq!(target, edge.target);
/// ```
pub use r#macro::Arrow;

297
lib/store/src/internal.rs Normal file
View file

@ -0,0 +1,297 @@
//! Provides a nice hashmap-esque interface for manipulating entries in the store's backend.
use std::sync::Arc;
use rocksdb::{BoundColumnFamily, IteratorMode};
pub use self::cx::{Context, Query, Write};
use crate::{util::IterExt as _, Error, Result};
/// An internal interface to a specific keyspace that exposes basic hashmap-esque operations
/// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a
/// [`Store`].
pub struct Keyspace<'db, C> {
pub(super) context: &'db C,
pub(super) cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db, C> Keyspace<'db, C>
where
C: Query,
{
/// Fetch a value from the keyspace.
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<impl AsRef<[u8]> + 'db>> {
self.context.get_pinned(&self.cf, key)
}
/// Test whether a key exists.
pub fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.get(key).map(|r| r.is_some())
}
/// Execute a prefix scan.
pub fn scan(
&self,
prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
let t = prefix.as_ref().to_vec();
self.context
.prefix_iterator(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map_err(Error::Internal)
}
/// List all pairs in the keyspace.
pub fn list(&self) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
self.context
.full_iterator(&self.cf, IteratorMode::Start)
.map_err(Error::Internal)
}
/// Execute a range scan
pub fn range<const N: usize>(
&self,
lower: [u8; N],
upper: [u8; N],
) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
// TODO: use a seek op to make this more efficient
self.context
.full_iterator(&self.cf, IteratorMode::Start)
.skip_while(move |r| match r {
Ok((ref k, _)) => k.as_ref() < &lower,
_ => false,
})
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.as_ref() < &upper,
_ => true,
})
.map_err(Error::Internal)
}
/// Join all the keys to their values in this keyspace.
///
/// This may be optimized compared to many random point lookups.
pub fn join(
&self,
keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.context
.multi_get(keys.into_iter().map(|x| (&self.cf, x)))
}
}
impl<C> Keyspace<'_, C>
where
C: Write,
{
/// Set the given `key` to the `value`, overwriting it if there was already a value there.
pub fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.context.put(&self.cf, key, val)
}
/// Drop the value if it exists.
pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.context.delete(&self.cf, key)
}
}
mod cx {
//! Contexts for doing reads, writes or both to the database.
//!
//! The traits in this module map abstract calls to their methods on the [rocksdb] objects.
use rocksdb::{
AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode,
};
use super::Keyspace;
use crate::{util::IterExt as _, Backend, Batch, Error, Result, Store, Transaction, OK};
/// A context for executing database operations.
pub trait Context {
/// Open the keyspace identified by `cf`.
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self>
where
Self: Sized;
}
/// A context in which one can read from the data store.
///
/// Specifically, this maps calls to either the transaction or the store's internals without us having
/// to implement methods for *both* transactions and the store.
pub trait Query: Context {
type Backend: DBAccess;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>>;
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
/// Optimized multi-point lookup.
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>>;
}
/// A context in which one can read from and modify the data store.
pub trait Write: Context {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>;
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()>;
}
impl Context for Store {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self> {
Keyspace {
cf: self.inner.cf_handle(cf.as_ref()).unwrap(),
context: &self,
}
}
}
impl Query for Store {
type Backend = Backend;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>> {
self.inner.get_pinned_cf(cf, key).map_err(Error::Internal)
}
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Backend> {
self.inner.prefix_iterator_cf(cf, prefix)
}
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.inner
.multi_get_cf(keys)
.into_iter()
.map_err(Error::Internal)
.collect()
}
}
impl Context for Transaction<'_> {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self> {
Keyspace {
cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(),
context: &self,
}
}
}
impl<'db> Query for Transaction<'db> {
type Backend = rocksdb::Transaction<'db, Backend>;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>> {
self.inner.get_pinned_cf(cf, key).map_err(Error::Internal)
}
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.prefix_iterator_cf(cf, prefix)
}
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
fn multi_get<'a, C: AsColumnFamilyRef + 'a>(
&'a self,
keys: impl IntoIterator<Item = (&'a C, impl AsRef<[u8]>)>,
) -> Vec<Result<Option<Vec<u8>>>> {
self.inner
.multi_get_cf(keys)
.into_iter()
.map_err(Error::Internal)
.collect()
}
}
impl Write for Transaction<'_> {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> {
self.inner.delete_cf(cf, key).map_err(Error::Internal)
}
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
self.inner.put_cf(cf, key, val).map_err(Error::Internal)
}
}
impl Context for Batch {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self>
where
Self: Sized,
{
Keyspace {
cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(),
context: &self,
}
}
}
impl Write for Batch {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> {
self.inner.borrow_mut().delete_cf(cf, key);
OK
}
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
self.inner.borrow_mut().put_cf(cf, key, val);
OK
}
}
}

View file

@ -3,7 +3,7 @@ use std::fmt::{Debug, Display};
use chrono::{DateTime, Utc};
use ulid::Ulid;
use crate::{Alias, Error, Result, Transaction};
use crate::arrow::{ArrowKind, Basic, Multi};
/// A unique identifier for vertices in the database.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
@ -31,7 +31,7 @@ impl Key {
Key(key)
}
pub fn timestamp(self) -> DateTime<Utc> {
let ms = Ulid::from_bytes(self.0).timestamp_ms();
let ms = self.to_ulid().timestamp_ms();
DateTime::from_timestamp_millis(ms as i64).unwrap()
}
/// Join two keys together.
@ -46,6 +46,14 @@ impl Key {
let head = Key::from_slice(&buf[16..]);
(tail, head)
}
pub(crate) fn range(ts: DateTime<Utc>) -> ([u8; 16], [u8; 16]) {
let min = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MIN).to_bytes();
let max = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MAX).to_bytes();
(min, max)
}
fn to_ulid(self) -> Ulid {
Ulid::from_bytes(self.0)
}
}
impl AsRef<[u8]> for Key {
@ -53,46 +61,3 @@ impl AsRef<[u8]> for Key {
&self.0
}
}
/// Anything that can be used to reference a vertex, both "normal" [keys](Key)
/// and [aliases](Alias).
///
/// In general, using a key directly is going to be more efficient than using
/// an alias, because it incurs less lookups.
pub trait Keylike: Sized {
/// Translate the thing to a [`Key`].
///
/// This function should return [`Error::Missing`] if the key cannot be located.
fn translate(self, tx: &Transaction<'_>) -> Result<Key>;
/// Translate, and check whether the key is actually registered.
///
/// This function should return [`Error::Undefined`] if the key does not *really*
/// exist. It should return [`Error::Missing`] if the key can't be found.
fn checked_translate(self, tx: &Transaction<'_>) -> Result<Key> {
let key = self.translate(tx)?;
if !tx.is_registered(key)? {
Err(Error::Undefined { key })
} else {
Ok(key)
}
}
}
impl Keylike for Key {
fn translate(self, _: &Transaction<'_>) -> Result<Key> {
Ok(self)
}
}
impl<A> Keylike for A
where
A: Alias,
{
fn translate(self, tx: &Transaction<'_>) -> Result<Key> {
tx.lookup_alias(self)
}
}
/// A type tag identifying a vertex.
#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
pub struct Tag(pub u8);

View file

@ -1,78 +1,113 @@
#![feature(iterator_try_collect)]
//! The data store abstractions used by the ActivityPuppy project.
#![feature(iterator_try_collect, associated_type_defaults)]
//! Data persistence for the ActivityPuppy social media server built on top of [rocksdb].
//!
//! Persistence in a puppy server is handled by this component, which implements a directed graph
//! inspired datastore on top of the [rocksdb] key-value store.
//! # Overview
//!
//! The workflow for manipulating stuff in the store is to open a [`Store`], and then to call
//! its [`transaction`](Store::transaction) method. This method takes a function that, given
//! a [`Transaction`], returns a result with some value. The `Transaction` object contains some
//! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it
//! back.
//! The design of the data store's abstractions is heavily inspired by graph theory. The idea is to encourage
//! composition and loose coupling by segmenting all data associated with a node into [mixins][Mixin], and
//! modeling relations and predicates between nodes as [arrows][Arrow]. In additions, the key identifying a
//! node can be [aliased][Alias] by a string newtype, which must be unique within the namespace of that alias.
//!
//! This component is specialized to puppy's storage needs, and probably won't be much use unless
//! you're writing something that interfaces with puppy.
//! The API is optimized for reducing boilerplate and legibility at the call site.
//!
//! There are three interfaces to the store: the read-only [`Store`], the write-only [`Batch`] and the [`Transaction`],
//! which allows both reads and writes.
use std::{path::Path, sync::Arc};
use std::{cell::RefCell, path::Path, sync::Arc};
use derive_more::From;
use rocksdb::{MultiThreaded, Options, TransactionDBOptions};
type Backend = rocksdb::TransactionDB<MultiThreaded>;
use rocksdb::{Options, TransactionDBOptions, WriteBatchWithTransaction};
use types::Schema;
mod alias;
mod internal;
mod key;
mod transaction;
pub use key::{Key, Keylike, Tag};
pub use transaction::Transaction;
pub use {alias::Alias, arrow::Arrow, mixin::Mixin};
pub mod alias;
pub mod arrow;
pub mod mixin;
pub mod util;
/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
pub const OK: Result<()> = Ok(());
/// Master list of all column family names in use.
const SPACES: &[&'static str] = &[
"registry",
"username/l",
"username/r",
"follows/l",
"follows/r",
"profile",
"content",
"created-by/l",
"created-by/r",
"pending-fr/l",
"pending-fr/r",
"multi:id-map",
"multi:index/l",
"multi:index/r",
#[cfg(test)]
"test-arrow/l",
#[cfg(test)]
"test-arrow/r",
];
/// The handle to the data store.
///
/// This type can be cloned freely.
#[derive(Clone)]
pub struct Store {
inner: Arc<Backend>,
}
mod mixin;
/// The name of the puppy data store inside the state directory.
const STORE_NAME: &str = "main-store";
impl Store {
/// Open a data store in the given `state_dir`.
/// Open a [`Store`]. Creates one if it doesn't exist yet at the state directory path.
pub fn open(state_dir: impl AsRef<Path>, schema: Schema) -> Result<Store> {
Store::open(state_dir, schema)
}
pub use {alias::Alias, arrow::Arrow, key::Key, mixin::Mixin};
pub mod arrow;
pub mod types;
pub mod util;
/// The main interface to the data persistence engine.
///
/// If the data store does not exist yet, it will be created.
pub fn open(state_dir: impl AsRef<Path>) -> Result<Store> {
/// This type provides reading capabilities, but does not expose APIs for manipulating data in the store. For
/// that, you must [run][Store::run] a [`Transaction`] or [apply][Store::apply] a [`Batch`].
#[derive(Clone)]
pub struct Store {
// TODO: maybe switch to `OptimisticTransactionDB` because it has `batched_multi_get_cf`, which may be useful
// if we end up doing lots of point lookups. alternatively, maybe we don't need *transactions* altogether, and
// we can get away with write batches and snapshots. the main problem with transactions is that it doesn't let
// us do range deletes, which affects the efficiency of multiarrow deletion.
//
// a switch to write batches is feasible if we end up not doing reads and writes in the same transaction.
inner: Arc<Backend>,
}
/// Hosts APIs for manipulating the data store.
///
/// You can access these APIs from the body of the closure passed to [`Store::run`].
pub struct Transaction<'db> {
inner: rocksdb::Transaction<'db, Backend>,
store: &'db Store,
}
/// A set of writes that are to be executed atomically.
pub struct Batch {
inner: RefCell<WriteBatchWithTransaction<true>>,
store: Store,
}
impl Store {
/// Run a [transaction][Transaction].
///
/// In a transaction, either all writes succeed, or the transaction is aborted and the changes are not
/// recorded. Changes made inside a transaction can be read from within that transaction before they are
/// committed.
///
/// If the closure passed to `run` returns an error, the transaction is rolled back, and otherwise the
/// changes are committed.
pub fn run<T, E>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T, E>) -> Result<T, E>
where
E: From<Error>,
{
let tx = Transaction {
inner: self.inner.transaction(),
store: &self,
};
let r = f(&tx);
if let Err(e) = if r.is_err() {
tx.inner.rollback()
} else {
tx.inner.commit()
} {
return Err(E::from(Error::Internal(e)));
}
r
}
/// Apply a batch of changes atomically.
pub fn apply(&self, batch: Batch) -> Result<()> {
self.inner.write(batch.inner.into_inner())?;
OK
}
/// Construct a [`Batch`].
pub fn batch(&self) -> Batch {
Batch {
inner: RefCell::new(WriteBatchWithTransaction::default()),
store: self.clone(),
}
}
/// Open the data store in `state_dir`, and create one if it doesn't exist yet.
pub fn open(state_dir: impl AsRef<Path>, schema: Schema) -> Result<Store> {
let mut db_opts = Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);
@ -81,57 +116,24 @@ impl Store {
&db_opts,
&tx_opts,
state_dir.as_ref().join(STORE_NAME),
SPACES,
schema.0,
)?);
Ok(Store { inner })
}
/// Construct a temporary store, for testing. This store gets erased after `f` is done.
pub fn with_tmp<T, E>(f: impl FnOnce(Store) -> Result<T, E>) -> Result<T, E>
where
E: From<Error>,
{
let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir");
f(Store::open(tmp_dir)?)
}
/// Delete the whole store.
///
/// **This deletes all data in the store**. Do not run this unless you want to delete all the state of the instance.
/// Delete the main data store in `state_dir` if it exists.
pub fn nuke(state_dir: impl AsRef<Path>) -> Result<()> {
Backend::destroy(&Options::default(), state_dir.as_ref().join(STORE_NAME))
.map_err(Error::from)
.map_err(Error::Internal)
}
/// Get the value of mixin `M` for `key`.
pub fn lookup<M>(&self, key: impl Keylike) -> Result<(Key, M)>
where
M: Mixin,
{
self.transaction(|tx| tx.lookup(key))
}
/// Get the key associated with a given [alias][Alias].
pub fn translate<A>(&self, s: impl ToString) -> Result<Key>
where
A: Alias,
{
self.transaction(|tx| tx.lookup_alias(A::from(s.to_string())))
}
/// Quickly test whether a particular [arrow][Arrow] exists.
pub fn exists<A>(&self, arrow: (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.transaction(|tx| tx.exists::<A>(arrow))
/// Open a store that lives until `f` returns, for testing.
pub fn test<T>(schema: Schema, f: impl FnOnce(Store) -> T) -> T {
let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir");
f(Store::open(tmp_dir, schema).expect("failed to open temporary data store in {tmp_dir}"))
}
}
/// An isolated keyspace.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Space(&'static str);
impl AsRef<str> for Space {
fn as_ref(&self) -> &str {
&self.0
}
}
/// A shorthand for committing a [`Transaction`] (because I think `Ok(())` is ugly).
pub const OK: Result<()> = Ok(());
/// Results from this component.
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -154,3 +156,5 @@ pub enum Error {
Encoding(bincode::error::EncodeError),
Decoding(bincode::error::DecodeError),
}
type Backend = rocksdb::TransactionDB<rocksdb::MultiThreaded>;

View file

@ -1,35 +1,250 @@
//! Modules of information.
use std::ops::RangeBounds;
use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use crate::Space;
use super::{
types::{MixinSpec, Value},
Batch, Store, Transaction,
};
use crate::{Error, Key, Result};
/// A simple piece of data associated with a vertex.
pub trait Mixin: Encode + Decode {
const SPACE: Space;
/// Mixins are the simplest pieces of data in the store.
pub trait Mixin: Value<Type = MixinSpec> + Encode + Decode {}
/// Derive a [`Mixin`] implementation.
///
/// In addition to deriving `Mixin`, you will need to derive or implement [`Encode`]
/// and [`Decode`].
pub use r#macro::Mixin;
impl Store {
/// Get the value!
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
where
M: Mixin,
{
op::get_mixin(self, node)
}
/// Check if `node` has a mixin `M`.
pub fn has_mixin<M>(&self, node: Key) -> Result<bool>
where
M: Mixin,
{
op::has_mixin::<M>(self, node)
}
/// Get all `M`s where the key's timestamp is within the `range`.
pub fn range<M>(
&self,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
op::get_range(self, range)
}
/// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results.
pub fn join_on<M>(
&self,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
op::join_on(self, iter)
}
}
/// Information needed to render a social media profile.
#[derive(Encode, Decode)]
pub struct Profile {
pub post_count: usize,
pub account_name: String,
pub display_name: Option<String>,
pub about_string: Option<String>,
pub about_fields: Vec<(String, String)>,
impl Transaction<'_> {
/// Apply an update function to the mixin `M` of `node`.
///
/// # Errors
///
/// - [`Error::Missing`]: if `node` does not have a mixin of this type.
///
/// [`Error::Missing`]: crate::Error::Missing
pub fn update<M>(&self, node: Key, update: impl FnOnce(M) -> M) -> Result<()>
where
M: Mixin,
{
op::update(self, node, update)
}
/// Get the mixin of the specified type associated with `node`.
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
where
M: Mixin,
{
op::get_mixin(self, node)
}
/// Add a mixin to `node`.
///
/// # Errors
///
/// - [`Error::Conflict`]: if `node` already has a mixin of type `M`.
///
/// [`Error::Conflict`]: crate::Error::Missing
pub fn add_mixin<M>(&self, node: Key, mixin: M) -> Result<()>
where
M: Mixin,
{
if op::has_mixin::<M>(self, node)? {
return Err(Error::Conflict);
} else {
op::add_mixin::<M>(self, node, mixin)
}
}
/// Check whether `node` has an `M` defined for it.
pub fn has_mixin<M>(&self, node: Key) -> Result<bool>
where
M: Mixin,
{
op::has_mixin::<M>(self, node)
}
/// Get all `M`s where the key's timestamp is within the `range`.
pub fn range<M>(
&self,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
op::get_range(self, range)
}
/// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results.
pub fn join_on<M>(
&self,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
op::join_on(self, iter)
}
}
impl Mixin for Profile {
const SPACE: Space = Space("profile");
impl Batch {
/// Add a mixin to the `node`.
///
/// **Note**: unlike [`Transaction::add_mixin`], this will *not* return an error if the key already has a mixin
/// of this type. This *should* not cause inconsistency.
pub fn put_mixin<M>(&mut self, node: Key, mixin: M)
where
M: Mixin,
{
op::add_mixin(self, node, mixin).unwrap()
}
}
/// Contents of a post.
#[derive(Encode, Decode)]
pub struct Content {
pub content: Option<String>,
pub summary: Option<String>,
mod op {
use std::ops::{Bound, RangeBounds};
use chrono::{DateTime, TimeDelta, Utc};
use either::Either;
use super::Mixin;
use crate::{internal::*, util::IterExt as _, Error, Key, Result};
pub fn update<M>(
cx: &(impl Query + Write),
node: Key,
update: impl FnOnce(M) -> M,
) -> Result<()>
where
M: Mixin,
{
// TODO: implement in terms of a merge operator instead of separate query and write ops.
// this would let us remove the `Query` bound, which would in turn let us update from within
// a batch.
//
// See https://github.com/facebook/rocksdb/wiki/Merge-Operator
//
// It looks like rocksdb allows you to specify a merge operator per column family.[^1]
// This means we can construct our column families with a merge operator that knows how to encode and decode mixins.
//
// [^1]: https://github.com/facebook/rocksdb/blob/9d37408f9af15c7a1ae42f9b94d06b27d98a011a/include/rocksdb/options.h#L128
let tree = cx.open(M::SPEC.keyspace);
match tree.get(node.as_ref())? {
None => Err(Error::Missing),
Some(buf) => {
let new = decode(buf).map(update).and_then(encode)?;
tree.set(node, new)
}
}
}
impl Mixin for Content {
const SPACE: Space = Space("content");
pub fn get_mixin<M: Mixin>(cx: &impl Query, node: Key) -> Result<Option<M>> {
cx.open(M::SPEC.keyspace).get(node)?.map(decode).transpose()
}
pub fn add_mixin<M: Mixin>(cx: &impl Write, node: Key, mixin: M) -> Result<()> {
cx.open(M::SPEC.keyspace).set(node, encode(mixin)?)
}
pub fn has_mixin<M: Mixin>(cx: &impl Query, node: Key) -> Result<bool> {
cx.open(M::SPEC.keyspace).has(node)
}
pub fn get_range<M: Mixin>(
cx: &impl Query,
range: impl RangeBounds<DateTime<Utc>>,
) -> impl Iterator<Item = Result<(Key, M)>> + '_ {
// TODO: Test this thoroughly
const MS: TimeDelta = TimeDelta::milliseconds(1);
let iter = match (range.start_bound(), range.end_bound()) {
(Bound::Unbounded, Bound::Unbounded) => Either::Left(cx.open(M::SPEC.keyspace).list()),
(min, max) => {
let lower = match min {
Bound::Unbounded => [u8::MIN; 16],
Bound::Included(inc) => Key::range(*inc).0,
Bound::Excluded(exc) => Key::range(*exc + MS).0,
};
let upper = match max {
Bound::Unbounded => [u8::MAX; 16],
Bound::Included(inc) => Key::range(*inc).1,
Bound::Excluded(exc) => Key::range(*exc - MS).1,
};
Either::Right(cx.open(M::SPEC.keyspace).range(lower, upper))
}
};
iter.bind_results(|(k, v)| {
let key = Key::from_slice(k.as_ref());
let val = decode(v)?;
Ok((key, val))
})
}
pub fn join_on<M>(
cx: &impl Query,
iter: impl IntoIterator<Item = Result<Key>>,
) -> Result<Vec<(Key, Option<M>)>>
where
M: Mixin,
{
let keys: Vec<Key> = iter.into_iter().try_collect()?;
cx.open(M::SPEC.keyspace)
.join(keys.iter())
.into_iter()
.zip(keys)
.map(|(opt, key)| {
let Some(buf) = opt? else {
return Ok((key, None));
};
let val = decode(buf)?;
Ok((key, Some(val)))
})
.try_collect()
}
pub(super) fn encode(data: impl bincode::Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
pub(super) fn decode<T>(data: impl AsRef<[u8]>) -> Result<T>
where
T: bincode::Decode,
{
bincode::decode_from_slice(data.as_ref(), bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}
}

View file

@ -1,432 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use bincode::{Decode, Encode};
use rocksdb::{BoundColumnFamily, IteratorMode};
use crate::{
arrow::Direction, key::Tag, util::IterExt as _, Alias, Arrow, Backend, Error, Key, Keylike,
Mixin, Result, Store, OK, SPACES,
};
impl Store {
/// Initiate a transaction.
///
/// If the result is an error, the transaction is rolled back, and otherwise the transaction
/// is committed.
pub fn transaction<T>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T>) -> Result<T> {
// Load all the column family handles, because they can't be accessed through the
// `rocksdb::Transaction` struct, only the `TransactionDB`.
let spaces = SPACES
.into_iter()
.map(|name| (*name, self.inner.cf_handle(name).unwrap()))
.collect();
let tx = Transaction {
inner: self.inner.transaction(),
spaces,
};
let result = f(&tx);
if result.is_err() {
tx.inner.rollback()?;
} else {
tx.inner.commit()?;
}
result
}
/// Check whether a key exists in the registry,
pub fn is_registered(&self, key: Key) -> Result<bool> {
let cf = self
.inner
.cf_handle("registry")
.expect("failed to open registry");
self.inner
.get_pinned_cf(&cf, key)
.map(|opt| opt.is_some())
.map_err(Error::Internal)
}
}
/// A database transaction, in which either each action succeeds, or everything fails
/// together.
///
/// The transaction struct is the interface for quering and manipulating persisted content.
pub struct Transaction<'db> {
inner: rocksdb::Transaction<'db, Backend>,
spaces: HashMap<&'static str, Arc<BoundColumnFamily<'db>>>,
}
/// Methods for manipulating the registry.
///
/// Before you can manipulate a vertex, its needs to be registered.
impl Transaction<'_> {
/// Register a new vertex.
pub fn create_vertex(&self, key: Key, tag: Tag) -> Result<()> {
self.with("registry").set(key, [tag.0])
}
/// Delete a vertex from the registry.
pub fn delete_vertex(&self, key: Key) -> Result<()> {
// TODO: also make this delete all related data?
self.with("registry").del(key)
}
/// Check whether a vertex is registered in the database.
pub fn is_registered(&self, key: Key) -> Result<bool> {
self.with("registry").has(key)
}
}
/// Methods for manipulating mixins.
///
/// For each implementor of [`Mixin`], a vertex can have at most one record of that type
/// associated with it.
impl Transaction<'_> {
/// Query the store for a value associated with the vertex `key` identifies.
///
/// Using a [`Key`] is more efficient than using an alias.
pub fn lookup<M>(&self, key: impl Keylike) -> Result<(Key, M)>
where
M: Mixin,
{
// Checked translate isn't needed, we'll complain if we can't find the data.
let canonicalized_key = key.translate(&self)?;
let raw = self.with(M::SPACE).get(canonicalized_key)?;
let value = decode(raw.as_ref())?;
Ok((canonicalized_key, value))
}
/// Associate a new mixin value with the key.
///
/// # Errors
///
/// - `Error::Conflict` if a mixin of this type is already associated with the vertex
/// - `Error::Undefined` if `key` is not in the registry.
pub fn insert<M>(&self, key: impl Keylike, data: M) -> Result<()>
where
M: Mixin,
{
let key = key.checked_translate(&self)?;
let data = encode(data)?;
let ns = self.with(M::SPACE);
// Check for conflicts. Fail if the key already exists, otherwise set the key
// to the given value.
if ns.has(key)? {
Err(Error::Conflict)
} else {
ns.set(key, data)
}
}
/// Apply an update function to the mixin identified by the key.
///
/// # Errors
///
/// - `Error::Undefined` if the `key` is not registered
/// - `Error::Missing` if `key` does not exist in the keyspace associated with `M`
pub fn update<M>(&self, key: impl Keylike, f: impl FnOnce(Key, M) -> Result<M>) -> Result<()>
where
M: Mixin,
{
let key = key.checked_translate(self)?;
let (key, old) = self.lookup::<M>(key)?;
let new = f(key, old).and_then(encode)?;
self.with(M::SPACE).set(key, new)
}
/// Remove the mixin from the vertex `key` refers to.
///
/// Doesn't complain if the value does not exist in the expected keyspace.
pub fn remove<M>(&self, key: impl Keylike) -> Result<Option<M>>
where
M: Mixin,
{
// Checked translate isn't needed because we don't care if the key is bogus.
let canonical_key = key.translate(self)?;
let ns = self.with(M::SPACE);
match ns.pop(canonical_key) {
Ok(Some(val)) => decode(&val).map(Some),
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
/// List all key-value pairs for mixins of type `M`.
pub fn list<M>(&self) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
self.with(M::SPACE).list().bind_results(|(k, v)| {
let v = decode(v.as_ref())?;
let k = Key::from_slice(k.as_ref());
Ok((k, v))
})
}
}
/// Methods for interacting with [aliases][Alias], which are unique alternate keys.
impl Transaction<'_> {
/// Look up the key that the given alias maps to.
///
/// If the key was deleted, but the alias wasn't properly cleaned up,
pub fn lookup_alias<A>(&self, alias: A) -> Result<Key>
where
A: Alias,
{
let (l, _) = A::SPACE;
let raw = self.with(l).get(alias.to_string())?;
Ok(Key::from_slice(raw.as_ref()))
}
/// Create a new alias of type `A` for the given [`Key`].
///
/// If the alias already exists, this function returns `Conflict`.
pub fn insert_alias<A>(&self, key: Key, alias: A) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
let alias = alias.to_string();
if self.with(l).has(&alias)? {
return Err(Error::Conflict);
}
self.with(l).set(&alias, key)?;
self.with(r).set(key, &alias)?;
OK
}
/// Delete the alias of type `A` that points to `key`.
pub fn remove_alias<A>(&self, key: Key) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
// First, pop the reverse mapping, which will give us the encoded
// key for the normal mapping. If it doesn't exist, don't delete
// the normal mapping.
if let Some(alias) = self.with(r).pop(key)? {
self.with(l).pop(alias)?;
}
OK
}
}
impl Transaction<'_> {
/// Find an arrow of type `A` with the given `tail` and `head`.
pub fn lookup_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<Option<A>>
where
A: Arrow,
{
let (l, _) = A::SPACE;
match self.with(l).get(tail.fuse(head)) {
Ok(raw) => decode(raw.as_ref()).map(Some),
Err(Error::Missing) => Ok(None),
Err(err) => Err(err),
}
}
/// Create a new arrow of type `A` and associate the label with it.
///
/// # Errors
///
/// - `Error::Undefined` if either key is not registered
pub fn insert_arrow<A>(&self, (tail, head): (Key, Key), label: A) -> Result<()>
where
A: Arrow,
{
if !self.is_registered(tail)? {
return Err(Error::Undefined { key: tail });
}
if !self.is_registered(head)? {
return Err(Error::Undefined { key: head });
}
let (l, r) = A::SPACE;
let label = encode(label)?;
self.with(l).set(tail.fuse(head), &label)?;
self.with(r).set(head.fuse(tail), &label)?;
OK
}
/// Delete an arrow from the data store.
pub fn remove_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<()>
where
A: Arrow,
{
self.with(A::SPACE.0).del(tail.fuse(head))?;
self.with(A::SPACE.1).del(head.fuse(tail))?;
OK
}
/// Check whether an arrow exists.
pub fn exists<A>(&self, (tail, head): (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.with(A::SPACE.0).has(tail.fuse(head))
}
/// Get all arrows of type `A` "pointing at" `key`.
pub fn list_incoming<A>(&self, key: impl Keylike) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
self.list_arrows_where(Direction::Incoming, key)
}
/// Get all arrows of type `A` "pointing away from" `key`.
pub fn list_outgoing<A>(&self, key: impl Keylike) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
self.list_arrows_where(Direction::Outgoing, key)
}
/// Get all arrows of type `A`.
pub fn list_arrows<A>(&self) -> impl Iterator<Item = Result<(Key, A, Key)>> + '_
where
A: Arrow,
{
self.with(A::SPACE.0).list().bind_results(|(k, v)| {
let (tail, head) = Key::split(k.as_ref());
decode(v.as_ref()).map(|label| (tail, label, head))
})
}
/// Select arrows with the given direction relative to the given key.
fn list_arrows_where<A>(
&self,
direction: Direction,
key: impl Keylike,
) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
// Keys in space 0 are arranged with the tail at the start, and the ones in space 1
// are arranged with the head at the start. This allows us to efficiently prefix scan
// regardless of the direction, at the cost of increased space usage.
let space = match direction {
Direction::Outgoing => A::SPACE.0,
Direction::Incoming => A::SPACE.1,
};
let key = key.translate(&self).unwrap();
#[cfg(test)]
eprintln!("scanning {} using prefix {key}", space.0);
self.with(space).scan(key).bind_results(|(k, v)| {
// Because we're prefix scanning on the first half of the key, we only want to
// get the second here.
let (_this, other) = Key::split(k.as_ref());
#[cfg(test)]
eprintln!(" found {_this}:{other}");
decode(v.as_ref()).map(|label| (other, label))
})
}
pub(crate) fn quiver(&self, tag: Tag) -> Quiver<'_> {
Quiver { tag, tx: &self }
}
}
impl Transaction<'_> {
/// Use a keyspace.
fn with(&self, name: impl AsRef<str>) -> Keyspace<'_> {
Keyspace {
cf: self.spaces[name.as_ref()].clone(),
tx: &self,
}
}
}
/// Provides the basic API for a keyspace/column family.
struct Keyspace<'db> {
tx: &'db Transaction<'db>,
cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db> Keyspace<'db> {
/// Retrieve a value from the database. Returns `Missing` if the key does not exist.
fn get(&self, key: impl AsRef<[u8]>) -> Result<impl AsRef<[u8]> + 'db> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.and_then(|opt| opt.ok_or(Error::Missing))
}
/// Set the value at `key` to `val`.
fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.tx
.inner
.put_cf(&self.cf, key, val)
.map_err(Error::Internal)
}
/// Delete the key-value pair identified by `key`.
fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.tx.inner.delete_cf(&self.cf, &key)?;
OK
}
/// Remove the key and associated value from the keyspace, and return its previous value.
fn pop(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
let old = self.tx.inner.get_for_update_cf(&self.cf, &key, true)?;
self.del(key)?;
Ok(old)
}
/// Check whether the key exists in the keyspace.
fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.map(|opt| opt.is_some())
}
/// Execute a prefix scan.
fn scan(
&self,
prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
let t = prefix.as_ref().to_vec();
self.tx
.inner
.prefix_iterator_cf(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map_err(Error::Internal)
}
/// Show all items in the entire keyspace.
fn list(
&self,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
self.tx
.inner
.full_iterator_cf(&self.cf, IteratorMode::Start)
.map_err(Error::Internal)
}
}
/// The quiver allows one to manipulate all parallel edges tagged with a particular type.
pub struct Quiver<'db> {
tx: &'db Transaction<'db>,
tag: Tag,
}
impl<'db> Quiver<'db> {
pub fn insert(&self, origin: Key, target: Key, identity: Key) -> Result<()> {
let fused = origin.fuse(target);
self.tx.with("multi:id-map").set(identity, fused)?;
let mut triple = [0; 48];
triple[..32].copy_from_slice(&fused);
triple[32..].copy_from_slice(identity.as_ref());
self.tx.with("multi:index/l").set(triple, b"")?;
triple[..32].rotate_left(16);
self.tx.with("multi:index/r").set(triple, b"")?;
OK
}
pub fn list_incoming(&self, target: Key) -> impl Iterator<Item = Result<(Key, Key)>> + 'db {
self.tx
.with("multi:index/r")
.scan(target)
.map_ok(|(k, _)| Key::split(&k.as_ref()[16..]))
}
}
fn encode(data: impl Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
fn decode<T>(data: &[u8]) -> Result<T>
where
T: Decode,
{
bincode::decode_from_slice(data, bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}
#[cfg(test)]
mod tests;

View file

@ -1,256 +0,0 @@
use super::*;
use crate::Space;
#[derive(Encode, Decode)]
struct TestArrow;
impl Arrow for TestArrow {
const SPACE: (Space, Space) = (Space("test-arrow/l"), Space("test-arrow/r"));
}
const TEST_TAG: Tag = Tag(69);
macro_rules! keygen {
{ $($name:ident)* } => {
$(
let $name = Key::gen();
eprintln!(concat!(stringify!($name), "={}"), $name);
)*
}
}
fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) -> Result<()> {
Store::with_tmp(|db| {
// Run these tests 128 times because misuse of prefix iterator may cause weird,
// obscure bugs :3
//
// Also, because we don't wipe the store between test runs, we have more chances
// to discover weird bugs that we wouldn't catch if there was only a single run.
Ok(for n in 0..128 {
eprintln!("--- run {n} ---");
db.transaction(|tx| {
keygen!(target origin);
tx.create_vertex(target, TEST_TAG)?;
tx.create_vertex(origin, TEST_TAG)?;
tx.insert_arrow((origin, target), TestArrow)?;
let l: Vec<String> = tx
.with("test-arrow/l")
.list()
.map_ok(|(k, _)| Key::split(k.as_ref()))
.map_ok(|(a, b)| format!("({a}, {b})"))
.try_collect()?;
eprintln!("test-arrow/l = {l:#?}");
let r: Vec<String> = tx
.with("test-arrow/r")
.list()
.map_ok(|(k, _)| Key::split(k.as_ref()))
.map_ok(|(a, b)| format!("({a}, {b})"))
.try_collect()?;
eprintln!("test-arrow/r = {r:#?}");
f(origin, target, &tx, n)
})?;
eprintln!("--- end run {n} ---");
})
})
}
#[test]
fn target_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let ti: Vec<_> = tx.list_incoming::<TestArrow>(target).keys().try_collect()?;
eprintln!("target.incoming = {ti:#?}");
assert!(ti.contains(&origin), "origin ∈ target.incoming");
assert!(!ti.contains(&target), "target ∉ target.incoming");
OK
})
}
#[test]
fn target_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let to: Vec<_> = tx.list_outgoing::<TestArrow>(target).keys().try_collect()?;
eprintln!("target.outgoing = {to:#?}");
assert!(!to.contains(&target), "target ∉ target.outgoing");
assert!(!to.contains(&origin), "origin ∉ target.outgoing");
OK
})
}
#[test]
fn origin_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oi: Vec<_> = tx.list_incoming::<TestArrow>(origin).keys().try_collect()?;
eprintln!("origin.incoming = {oi:#?}");
assert!(!oi.contains(&origin), "origin ∉ origin.incoming");
assert!(!oi.contains(&target), "target ∉ origin.incoming");
OK
})
}
#[test]
fn origin_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(origin).keys().try_collect()?;
eprintln!("origin.outgoing = {oo:#?}");
assert!(oo.contains(&target), "target ∈ origin.outgoing");
assert!(!oo.contains(&origin), "origin ∉ origin.outgoing");
OK
})
}
#[test]
fn fanout() -> Result<()> {
let targets: [Key; 128] = std::array::from_fn(|_| Key::gen());
let origin = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.create_vertex(origin, TEST_TAG)?;
for t in targets {
tx.create_vertex(t, TEST_TAG)?;
tx.insert_arrow((origin, t), TestArrow)?;
}
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(origin).keys().try_collect()?;
for t in targets {
assert!(oo.contains(&t), "∀ t ∈ targets: t ∈ origin.outgoing");
let ti: Vec<_> = tx.list_incoming::<TestArrow>(t).keys().try_collect()?;
assert!(
ti == vec! {origin},
"∀ t ∈ targets: t.incoming = {{origin}}"
);
}
OK
})
})
}
#[test]
fn fanin() -> Result<()> {
let origins: [Key; 128] = std::array::from_fn(|_| Key::gen());
let target = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.create_vertex(target, TEST_TAG)?;
for o in origins {
tx.create_vertex(o, TEST_TAG)?;
tx.insert_arrow((o, target), TestArrow)?;
}
let ti: Vec<_> = tx.list_incoming::<TestArrow>(target).keys().try_collect()?;
for o in origins {
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(o).keys().try_collect()?;
assert!(ti.contains(&o), "∀ o ∈ origins: o ∈ target.incoming");
assert!(
oo == vec! {target},
"∀ o ∈ origins: o.outgoing = {{target}}"
);
}
OK
})
})
}
#[test]
fn distinct_many_to_many() -> Result<()> {
let origins: [Key; 32] = std::array::from_fn(|_| Key::gen());
let targets: [Key; 32] = std::array::from_fn(|_| Key::gen());
Store::with_tmp(|db| {
db.transaction(|tx| {
for t in targets {
tx.create_vertex(t, TEST_TAG)?;
}
for o in origins {
tx.create_vertex(o, TEST_TAG)?;
for t in targets {
tx.insert_arrow((o, t), TestArrow)?;
}
}
let ti: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_incoming::<TestArrow>(t)
.keys()
.try_collect()
.map(|v: Vec<_>| (t, v))
})
.collect::<Result<_>>()?;
// For each origin point, there must be a target that has it as "incoming".
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().any(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∃ t ∈ targets: o ∈ t.incoming"
);
// Each target has each origin as incoming.
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().all(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∀ t ∈ targets: o ∈ t.incoming"
);
let to: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_outgoing::<TestArrow>(t)
.keys()
.try_collect()
.map(|v: Vec<_>| (t, v))
})
.collect::<Result<_>>()?;
// Our arrows point only from origins to targets, and there's a bug if there
// exists a target such that its outgoing set is non-empty.
assert!(
!targets.into_iter().any(|t| !to[&t].is_empty()),
"∄ t ∈ targets: t.outgoing ≠ ∅"
);
let oo: HashMap<Key, Vec<Key>> = origins
.into_iter()
.map(|o| {
tx.list_outgoing::<TestArrow>(o)
.keys()
.try_collect()
.map(|v: Vec<_>| (o, v))
})
.collect::<Result<_>>()?;
// Each origin has each target as outgoing.
assert!(
origins
.into_iter()
.all(|o| targets.into_iter().all(|t| oo[&o].contains(&t))),
"∀ o ∈ origins: ∀ t ∈ targets: t ∈ o.outgoing"
);
OK
})
})
}

178
lib/store/src/types.rs Normal file
View file

@ -0,0 +1,178 @@
//! Defining a [`Schema`].
//!
//! There is a lot of complicated machinery here to make it so that you have to write very little code to
//! define new types. Basically, if you want to define a thing to store, you need to implement the trait
//! for it (e.g. [`Arrow`]), and also implement [`Value`], where you create a specification describing which
//! namespaces store records of that type.
//!
//! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able
//! to operate on the types.
//!
//! [`Arrow`]: super::Arrow
use std::collections::HashSet;
use derive_more::Display;
/// The namespace where all vertices must be registered.
pub(crate) const NODE_HEADERS: Keyspace = Keyspace("header:node");
/// The namespace where multiedge identities are mapped to endpoints.
pub(crate) const MULTIEDGE_HEADERS: Keyspace = Keyspace("header:multiedge");
/// A specification of all user-defined namespaces.
///
/// # Example
///
/// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then
/// testing whether it exists. If the appropriate keyspaces are not known to the store, this will panic.
///
/// ```rust
/// use store::{ arrow::Arrow, types::Schema, Store, Key, OK };
///
/// // Each kind of value has a derive macro.
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// fn main () -> store::Result<()> {
/// // Here, we make sure that the namespaces used for `MyArrow` are known.
/// let schema = Schema::new()
/// .has::<MyArrow>();
///
/// let result = Store::test(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// let mut changes = db.batch();
/// changes.create(MyArrow { origin, target });
/// db.apply(changes)?;
///
/// db.exists::<MyArrow>(origin, target)
/// })?;
///
/// assert!(result);
/// OK
/// }
/// ```
///
/// [basic arrow]: crate::arrow::Basic
pub struct Schema(pub(crate) HashSet<Keyspace>);
impl Schema {
/// Construct a new empty schema.
pub fn new() -> Schema {
Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS]))
}
/// Add the component to the schema.
pub fn has<C>(mut self) -> Schema
where
C: Value,
{
self.add(C::SPEC);
self
}
/// Add a spec to the schema by mutable reference.
pub fn add(&mut self, spec: impl TypeSpec) -> &mut Schema {
spec.register(&mut self.0);
self
}
}
/// The name of a keyspace.
///
/// Specifically, this is the name of a RocksDB column family.
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
pub struct Keyspace(pub &'static str);
impl AsRef<str> for Keyspace {
fn as_ref(&self) -> &str {
self.0
}
}
/// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a
/// [mixin](MixinSpec).
///
/// All namespaces must be unique, and added to the [`Schema`].
pub trait Value {
type Type: TypeSpec;
const SPEC: Self::Type;
}
/// The specification for an [`Arrow`](crate::Arrow).
///
/// The listed namespaces must be unique among all other namespaces.
#[derive(Clone, Copy)]
pub struct ArrowSpec {
/// The keyspace where edge keys are ordered `(origin, target)`.
pub by_origin: Keyspace,
/// The keyspace where edge keys are ordered `(target, origin)`.
pub by_target: Keyspace,
}
/// A specification for the namespaces needed to store an [`Alias`][crate::Alias].
#[derive(Clone, Copy)]
pub struct AliasSpec {
/// The alias -> key mapping table.
pub keyspace: Keyspace,
/// The key -> alias mapping table.
pub reversed: Keyspace,
}
/// Where do we store a mixin?
#[derive(Clone, Copy)]
pub struct MixinSpec {
/// The key -> mixin mapping table.
pub keyspace: Keyspace,
}
/// Describes how to add a [`Value`] to a [`Schema`].
pub trait TypeSpec {
/// Register the namespaces.
fn register(&self, set: &mut HashSet<Keyspace>);
}
// TODO: better error messages.
impl TypeSpec for ArrowSpec {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.by_origin) {
panic! {
"Duplicate found while inserting Arrow::BY_ORIGIN: {}",
self.by_origin
}
}
if !set.insert(self.by_target) {
panic! {
"Duplicate found while inserting Arrow::BY_TARGET: {}",
self.by_target
}
}
}
}
impl TypeSpec for AliasSpec {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Alias::KEYSPACE: {}",
self.keyspace
}
}
if !set.insert(self.reversed) {
panic! {
"Duplicate found while inserting Alias::REVERSED: {}",
self.reversed
}
}
}
}
impl TypeSpec for MixinSpec {
fn register(&self, set: &mut HashSet<Keyspace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Mixin::KEYSPACE: {}",
self.keyspace
}
}
}
}

View file

@ -46,6 +46,28 @@ pub trait IterExt: Iterator + Sized {
{
self.next().ok_or(e)?
}
/// `filter_map` meets `and_then`.
fn filter_bind_results<'a, I, O, E>(
self,
mut f: impl FnMut(I) -> Result<Option<O>, E> + 'a,
) -> impl Iterator<Item = Result<O, E>> + 'a
where
Self: Iterator<Item = Result<I, E>> + 'a,
{
self.filter_map(move |r| r.and_then(|x| f(x)).transpose())
}
}
impl<I> IterExt for I where I: Iterator {}
/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next
/// argument is the second tuple element.
pub fn key<K, V>(key: K) -> impl FnOnce(V) -> (K, V) {
move |val| (key, val)
}
/// Curried function for creating a tuple, where the first argument is the first tuple element, and the next
/// argument is the second tuple element.
pub fn val<K, V>(val: V) -> impl FnOnce(K) -> (K, V) {
move |key| (key, val)
}