[wip] migrate to new store api

This commit is contained in:
Riley Apeldoorn 2024-04-23 23:11:11 +02:00
parent c26b6cdb04
commit c5bd6a127e
15 changed files with 1418 additions and 2047 deletions

5
Cargo.lock generated
View file

@ -885,6 +885,10 @@ dependencies = [
"libc",
]
[[package]]
name = "macro"
version = "0.0.0"
[[package]]
name = "matchit"
version = "0.7.3"
@ -1490,6 +1494,7 @@ dependencies = [
"bincode",
"chrono",
"derive_more",
"macro",
"rocksdb",
"tempfile",
"ulid",

View file

@ -3,6 +3,7 @@ members = [
"lib/puppy",
"lib/store",
"lib/fetch",
"lib/macro",
"bin/server",
"bin/pupctl",
]

7
lib/macro/Cargo.toml Normal file
View file

@ -0,0 +1,7 @@
[package]
name = "macro"
edition = "2021"
[lib]
path = "src/lib.rs"
proc-macro = true

16
lib/macro/src/lib.rs Normal file
View file

@ -0,0 +1,16 @@
use proc_macro::TokenStream;
#[proc_macro_derive(Arrow, attributes(origin, target, identity))]
pub fn arrow(item: TokenStream) -> TokenStream {
TokenStream::new()
}
#[proc_macro_derive(Alias)]
pub fn alias(item: TokenStream) -> TokenStream {
TokenStream::new()
}
#[proc_macro_derive(Mixin)]
pub fn mixin(item: TokenStream) -> TokenStream {
TokenStream::new()
}

View file

@ -1,46 +1,101 @@
#![feature(iterator_try_collect)]
pub use store::{self, Key, Store};
use store::{
alias::Username,
arrow::{self, multi::MultiArrow, AuthorOf},
mixin::{Content, Profile},
util::IterExt,
Keylike, Tag,
};
mod tags {
//! Type tags for vertices.
pub mod model {
use store::{types::Schema, Key};
use store::Tag;
#[derive(store::Mixin)]
pub struct Profile {
pub post_count: usize,
pub account_name: String,
pub display_name: Option<String>,
pub about_string: Option<String>,
pub about_fields: Vec<(String, String)>,
}
pub const ACTOR: Tag = Tag(0);
pub const POST: Tag = Tag(1);
pub const BITE: Tag = Tag(2);
#[derive(store::Mixin)]
pub struct Content {
pub content: Option<String>,
pub summary: Option<String>,
}
#[derive(store::Arrow, Clone, Copy)]
pub struct AuthorOf {
#[origin]
pub author: Key,
#[target]
pub object: Key,
}
#[derive(store::Arrow, Clone, Copy)]
pub struct Follows {
#[origin]
pub follower: Key,
#[target]
pub followed: Key,
}
#[derive(store::Arrow, Clone, Copy)]
pub struct Bite {
#[identity]
pub id: Key,
#[origin]
pub biter: Key,
#[target]
pub victim: Key,
}
#[derive(store::Arrow, Clone, Copy)]
pub struct FollowRequest {
#[identity]
pub id: Key,
pub origin: Key,
pub target: Key,
}
#[derive(store::Alias)]
pub struct Username(pub String);
/// Construct the schema.
pub fn schema() -> Schema {
Schema::new()
// Mixins
.has::<Profile>()
.has::<Content>()
// Aliases
.has::<Username>()
// Arrows
.has::<Bite>()
.has::<FollowRequest>()
.has::<AuthorOf>()
.has::<Follows>()
}
}
pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result<Key> {
let key = Key::gen();
db.transaction(|tx| {
tx.create_vertex(key, tags::POST)?;
tx.update::<Profile>(author, |_, mut profile| {
db.run(|tx| {
tx.update::<Profile>(author, |mut profile| {
profile.post_count += 1;
Ok(profile)
profile
})?;
tx.insert(key, Content {
tx.add_mixin(key, Content {
content: Some(content.to_string()),
summary: None,
})?;
tx.insert_arrow::<AuthorOf>((author, key))?;
tx.create(AuthorOf {
author,
object: key,
})?;
Ok(key)
})
}
pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
let key = Key::gen();
db.transaction(|tx| {
tx.create_vertex(key, tags::ACTOR)?;
tx.insert_alias(key, Username(username.to_string()))?;
tx.insert(key, Profile {
db.run(|tx| {
tx.add_alias(key, Username(username.to_string()))?;
tx.add_mixin(key, Profile {
post_count: 0,
account_name: username.to_string(),
display_name: None,
@ -51,43 +106,24 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result<Key> {
})
}
pub fn list_posts_by_author(
db: &Store,
author: impl Keylike,
) -> store::Result<Vec<(Key, Content)>> {
db.transaction(|tx| {
tx.list_outgoing::<AuthorOf>(author)
.bind_results(|(post_key, _)| tx.lookup::<Content>(post_key))
pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result<Vec<(Key, Content)>> {
db.run(|tx| {
tx.outgoing::<AuthorOf>(author)
.bind_results(|arr| tx.get_mixin::<Content>(arr.object))
.collect()
})
}
pub struct Bite {
pub id: Key,
pub biter: Key,
pub victim: Key,
}
impl MultiArrow for Bite {
const TYPE: Tag = tags::BITE;
}
pub fn bite_actor(db: &Store, biter: Key, victim: Key) -> store::Result<Key> {
db.transaction(|tx| {
// Bites are represented as multiedges.
let key = arrow::multi::insert::<Bite>(&tx, biter, victim)?;
// We can treat particular arrows in a quiver as a vertex by registering it.
tx.create_vertex(key, tags::BITE)?;
Ok(key)
db.run(|tx| {
let id = Key::gen();
tx.create(Bite { id, biter, victim })?;
Ok(id)
})
}
pub fn bites_on(db: &Store, victim: Key) -> store::Result<Vec<Bite>> {
db.transaction(|tx| {
arrow::multi::list_incoming::<Bite>(&tx, victim)
.map_ok(|(biter, id)| Bite { id, biter, victim })
.try_collect()
})
db.incoming::<Bite>(victim).try_collect()
}
pub mod tl {
@ -102,13 +138,10 @@ pub mod tl {
}
pub fn fetch_all(db: &Store) -> Result<Vec<Post>> {
db.transaction(|tx| {
db.run(|tx| {
let iter = tx.list::<Content>();
iter.bind_results(|(id, content)| {
let author = tx
.list_incoming::<AuthorOf>(id)
.keys()
.next_or(Error::Missing)?;
let author = tx.incoming::<AuthorOf>(id).next_or(Error::Missing)?;
Ok(Post {
id,
author,
@ -123,29 +156,35 @@ pub mod tl {
pub mod fr {
//! Follow requests
use store::{
arrow::{FollowRequested, Follows},
util::IterExt as _,
Key, Store, OK,
};
use store::{util::IterExt as _, Key, Store, OK};
pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.insert_arrow::<FollowRequested>((requester, target))?;
OK
use crate::model::{FollowRequest, Follows};
pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<FollowRequest> {
db.run(|tx| {
let req = FollowRequest {
id: Key::gen(),
origin: requester,
target,
};
tx.create(req)?;
Ok(req)
})
}
pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?;
tx.insert_arrow::<Follows>((requester, target))?;
db.run(|tx| {
tx.delete_all::<FollowRequest>(requester, target)?;
tx.create(Follows {
follower: requester,
followed: target,
})?;
OK
})
}
pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
db.run(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?;
OK
})

View file

@ -12,3 +12,4 @@ derive_more = "*"
bincode = "2.0.0-rc.3"
chrono = "*"
tempfile = "*"
macro = { path = "../macro" }

View file

@ -1,17 +1,115 @@
//! Alternative keys.
/// Derive an implementation of [`Alias`].
pub use r#macro::Alias;
use derive_more::{Display, From};
use super::{
types::{AliasSpec, Value},
Batch, Store, Transaction,
};
use crate::{Key, Result};
use crate::Space;
/// An alternative unique identifier for a node.
pub trait Alias: Value<Type = AliasSpec> + From<String> + AsRef<str> {}
/// An alternative unique key for a vertex.
pub trait Alias: ToString + From<String> {
const SPACE: (Space, Space);
impl Transaction<'_> {
/// Look up the key associated with the alias.
pub fn lookup<A>(&self, alias: A) -> Result<Option<Key>>
where
A: Alias,
{
op::lookup::<A>(self, alias.as_ref())
}
/// Get the alias associated with the `node`.
pub fn get_alias<A>(&self, node: Key) -> Result<Option<A>>
where
A: Alias,
{
op::get_alias(self, node)
}
/// Add an alias to `node`.
pub fn add_alias<A>(&self, node: Key, alias: A) -> Result<()>
where
A: Alias,
{
op::add_alias::<A>(self, node, alias.as_ref())
}
/// Check whether `node` has an `M` defined for it.
pub fn has_alias<A>(&self, node: Key) -> Result<bool>
where
A: Alias,
{
op::has_alias::<A>(self, node)
}
}
#[derive(Display, From)]
pub struct Username(pub String);
impl Alias for Username {
const SPACE: (Space, Space) = (Space("username/l"), Space("username/r"));
impl Store {
/// Look up the key associated with the alias.
pub fn lookup<A>(&self, alias: A) -> Result<Option<Key>>
where
A: Alias,
{
op::lookup::<A>(self, alias.as_ref())
}
/// Get the alias associated with the `node`.
pub fn get_alias<A>(&self, node: Key) -> Result<Option<A>>
where
A: Alias,
{
op::get_alias(self, node)
}
/// Check whether `node` has an `M` defined for it.
pub fn has_alias<A>(&self, node: Key) -> Result<bool>
where
A: Alias,
{
op::has_alias::<A>(self, node)
}
}
impl Batch {
/// Add an alias to `node`.
///
/// # Warning
///
/// This will *not* fail if the key already has a alias of this type, and in fact *it will cause fundamental inconsistency*
/// if the alias already exists. Don't call this function unless you know that neither `node` nor `alias` exist yet.
pub fn put_alias<A>(&mut self, node: Key, alias: A)
where
A: Alias,
{
// TODO: consistency *could* be checked by manually iterating over the transaction using `WriteBatch::iterate`
op::add_alias::<A>(self, node, alias.as_ref()).unwrap();
}
}
mod op {
use crate::{internal::*, Alias, Key, Result, OK};
pub fn lookup<A: Alias>(cx: &impl Query, alias: &str) -> Result<Option<Key>> {
cx.open(A::SPEC.keyspace).get(alias).map(|k| match k {
Some(x) => Some(Key::from_slice(x.as_ref())),
None => None,
})
}
pub fn has_alias<A: Alias>(cx: &impl Query, node: Key) -> Result<bool> {
cx.open(A::SPEC.reversed).has(node)
}
pub fn add_alias<A: Alias>(cx: &impl Write, node: Key, alias: &str) -> Result<()> {
cx.open(A::SPEC.keyspace).set(alias, node)?;
cx.open(A::SPEC.reversed).set(node, alias)?;
OK
}
pub fn get_alias<A: Alias>(cx: &impl Query, node: Key) -> Result<Option<A>> {
let buf = cx.open(A::SPEC.reversed).get(node)?;
Ok(buf.map(decode))
}
fn decode<T>(data: impl AsRef<[u8]>) -> T
where
T: From<String>,
{
T::from(String::from_utf8_lossy(data.as_ref()).into_owned())
}
}

View file

@ -1,77 +1,531 @@
//! Relations between nodes.
//! Directed edges, both parallel and simple.
//!
//! This module's main exports are [`Arrow`], and the two kinds of arrows: [`Basic`] and [`Multi`].
//!
//! Querying information about arrows can be done using the APIs exposed by [`Store`] and [`Transaction`],
//! and manipulating them can likewise be done from within the context of a `Transaction`.
//!
//! The arrow API is designed to aggressively minimize boilerplate for defining arrow types, and uses a
//! few tricks to do with associated constants and types to make it all work nicely.
//!
//! # Terminology
//!
//! An arrow is a part of a graph. Graphs consist of *nodes* (also called *vertices*) and *edges*. Nodes
//! can be seen as "things", and edges as connections between those things, defined by the two nodes that
//! they connect (which are called the *endpoints* of the edge).
//!
//! These edges can be directed or undirected. The difference is that undirected edges are identified by
//! an unordered pair of their endpoints, whereas directed edges (also called **arrows**), are identified
//! by an ordered pair, where one of the endpoints is the *tail* (or *origin* in the code/docs here) and
//! the other is the *head* (usually called *target* here).
//!
//! # Arrow kinds
//!
//! Arrows can be either [`Basic`] or [`Multi`]. The main difference is that basic arrows are defined
//! solely by which two nodes they connect, which means that their representation and certain operations
//! are more efficient. The trade-off is that they cannot capture more complex information than "this
//! edge exists".
//!
//! For some use cases (for example, predicates) this is sufficient, but other use cases require multiple,
//! individually identifiable and manipulatable parallel edges. Here, the trade-off is that while they
//! are much more expressive, and can be labeled by associating [mixins] with the arrow's identity key,
//! they incur more space overhead, and most operations on them are more expensive compared to basic
//! edges.
//!
//! Most arrow operations work on either kind of edge. Some signatures reference [`Arrow::Kind`], which
//! is either of the `Multi` or `Basic` types mentioned before. Because parallel arrows need to be
//! discernable from each other, each of them also has an `identity` key, in addition to listing the two
//! edges they connect.
//!
//! [mixins]: super::Mixin
use bincode::{Decode, Encode};
pub use self::kinds::{Basic, Multi};
use super::{
types::{ArrowSpec, Value},
Batch, Store, Transaction,
};
use crate::{util::IterExt as _, Key, Result};
use crate::Space;
/// A directed edge.
///
/// See the [module docs][self] for an introduction.
pub trait Arrow: Value<Type = ArrowSpec> + From<Self::Kind> + Into<Self::Kind> {
/// The representation of this arrow, which also determines whether parallel edges are allowed.
type Kind: ArrowKind = Basic;
}
pub mod multi {
//! Managing multiedges.
//!
//! Unlike regular [`Arrow`]s, which don't have an identity (they are identified by the two nodes that
//! they connect), multiarrows can have their own [`Key`]. This allows one to have multiple arrows in
//! the same direction connecting the same two vertices, which isn't possible with normal arrows.
//!
//! Multiarrows can also be treated as if they were vertices, if their identity (`Key`) is registered as
//! one.
//!
//! This comes with a trade-off, though, specifically in both space and complexity. A multi-arrow also
//! can't have a label, like a typical arrow.
/// Parameterizing arrows so we can distinguish between kinds of arrows.
///
/// This lets us present a common API for certain arrow-related operations while also leveraging some
/// specialization. Essentially, from a type parameter which implements [`Arrow`], we can tell both at
/// the type level and at the value level whether that arrow is a multi-arrow or not.
pub trait ArrowKind {
/// Whether this kind of arrow should be represented using the specialized representation for edges
/// that are allowed to be parallel.
const IS_MULTI: bool;
/// Construct an arrow from a buffer containing a correctly-oriented arrow.
///
/// Each arrow is stored twice, once "correctly", and once "reversed". This allows us to efficiently
/// list both the outgoing and incoming edges for any particular vertex by using a prefix scan on the
/// [`BY_ORIGIN`][ArrowSpec::by_origin] and [`BY_TARGET`][ArrowSpec::by_target] keyspaces respectively.
///
/// The buffer passed to this function will start with 16 bytes origin, followed by 16 bytes target.
/// For basic arrows, that's it, but for multiarrows there is an additional 16 bytes of "identity",
/// which is needed to discriminate between multiple parallel edges.
///
/// # Failure
///
/// This method must panic if `buf` is not the expected size (32 bytes for basic arrows, 48 bytes for
/// multi arrows). The responsibility for ensuring that `buf` is correctly oriented lies with the
/// caller lest the result is incorrect, but passing an incorrectly oriented arrow is not a memory
/// safety issue, so this function is safe.
fn dec(buf: &[u8]) -> Self;
/// Encode an arrow's key origin-first and target-first.
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>);
}
use crate::{Key, Result, Tag, Transaction};
pub fn insert<A>(tx: &Transaction<'_>, origin: Key, target: Key) -> Result<Key>
impl Store {
/// Check whether there exists any arrow of type `A` that points from `origin` to `target`.
pub fn exists<A>(&self, origin: Key, target: Key) -> Result<bool>
where
A: MultiArrow,
A: Arrow,
{
let key = Key::gen();
tx.quiver(A::TYPE).insert(origin, target, key)?;
Ok(key)
op::exists::<A>(self, origin, target)
}
/// Get all arrows of type `A` that point at `target`.
pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator<Item = Result<A::Kind>> + 'a
where
A::Kind: 'a,
A: Arrow,
{
op::incoming::<A>(self, target)
}
/// Get all arrows of type `A` that point away from `origin`.
pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator<Item = Result<A::Kind>> + 'a
where
A::Kind: 'a,
A: Arrow,
{
op::outgoing::<A>(self, origin)
}
}
impl Transaction<'_> {
/// Check whether there exists any arrow of type `A` that points from `origin` to `target`.
///
/// This only tells you whether there is *any* such arrow, not how many (in the case of parallel edges).
pub fn exists<A>(&self, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
op::exists::<A>(self, origin, target)
}
/// Get all arrows of type `A` that point at `target`.
pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::incoming::<A>(self, target).map_ok(A::from)
}
/// Get all arrows of type `A` that point away from `origin`.
pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator<Item = Result<A>> + 'a
where
A: Arrow + 'a,
{
op::outgoing::<A>(self, origin).map_ok(A::from)
}
/// Create a new arrow of type `A`.
///
/// This operation supports both [`Multi`] and [`Basic`] arrows.
///
/// # Example
///
/// The following snippet creates an arrow between `origin` and `target`.
///
/// ```rust
/// # fn main () -> store::Result<()> {
/// use store::{Arrow, Key};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// # store::new_interface::Store::test(|db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// db.run(|tx| {
/// tx.create(MyArrow { origin, target })
/// })?;
///
/// assert!(db.exists::<MyArrow>(origin, target)?);
/// # store::OK })
/// # }
/// ```
pub fn create<A>(&self, arrow: A) -> Result<()>
where
A: Arrow,
{
op::create::<A>(self, arrow.into())
}
/// Delete all edges of type `A` from `origin` to `target`.
///
/// It is not an error for this function not to delete anything.
pub fn delete_all<A>(&self, origin: Key, target: Key) -> Result<()>
where
A: Arrow,
{
op::delete_all::<A>(self, origin, target)
}
/// Delete a specific arrow.
pub fn delete_one<A>(&self, arrow: A) -> Result<()>
where
A: Arrow,
{
op::delete_one::<A>(self, arrow.into())
}
}
impl Batch {
/// Create an arrow. See [`Transaction::create`].
pub fn create<A>(&mut self, arrow: A)
where
A: Arrow,
{
op::create::<A>(self, arrow.into())
.expect("no errors expected to occur during batch operation")
}
/// Delete a specific arrow.
pub fn delete_one<A>(&mut self, arrow: A)
where
A: Arrow,
{
op::delete_one::<A>(self, arrow.into())
.expect("no errors expected to occur during batch operation")
}
}
mod op {
//! Implementations of arrow operations.
use super::*;
use crate::{internal::*, Key, Result, OK};
/// Check whether there exists at least one arrow of type `A` from `origin` to `target`.
pub fn exists<A>(cx: &impl Query, origin: Key, target: Key) -> Result<bool>
where
A: Arrow,
{
if A::Kind::IS_MULTI {
// In the case of a multi-edge, at least one result from the prefix scan
// indicates that there is at least one edge.
cx.open(A::SPEC.by_origin)
.scan(origin.fuse(target))
.next()
.transpose()
.map(|o| o.is_some())
} else {
cx.open(A::SPEC.by_origin).has(origin.fuse(target))
}
}
pub fn list_incoming<'db, A>(
tx: &'db Transaction<'db>,
/// List incoming arrows relative to `target`.
pub fn incoming<'db, A>(
cx: &'db impl Query,
target: Key,
) -> impl Iterator<Item = Result<(Key, Key)>> + 'db
) -> impl Iterator<Item = Result<A::Kind>> + 'db
where
A: MultiArrow,
A: Arrow,
A::Kind: 'db,
{
tx.quiver(A::TYPE).list_incoming(target)
// In the `by_target` keyspace, for either kind of arrow the layout is such that the target is
// the prefix, so we pick that keyspace to more efficiently list all arrows that target the key.
cx.open(A::SPEC.by_target)
.scan(target)
.map_ok(|(mut k, _)| {
// Arrows from `by_target` are oriented target-first, while the decoder function requires
// that the buffer is oriented origin-first. Regardless of whether `..32` covers the prefix
// or the whole slice, swapping the two keys always gives us the ordering expected by the
// decoding function.
let (t, o) = k[..32].split_at_mut(16);
t.swap_with_slice(o);
A::Kind::dec(&k)
})
}
pub trait MultiArrow {
const TYPE: Tag;
/// List outgoing arrows relative to `origin`.
pub fn outgoing<'db, A>(
cx: &'db impl Query,
origin: Key,
) -> impl Iterator<Item = Result<A::Kind>> + 'db
where
A: Arrow,
A::Kind: 'db,
{
cx.open(A::SPEC.by_origin)
.scan(origin)
.map_ok(|(ref k, _)| A::Kind::dec(k))
}
/// Create a new arrow.
pub fn create<A>(cx: &impl Write, arrow: A::Kind) -> Result<()>
where
A: Arrow,
{
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).set(by_origin, b"")?;
cx.open(A::SPEC.by_target).set(by_target, b"")?;
OK
}
/// Delete all arrows from `origin` to `target`.
///
/// TODO: Remove the query requirement (depends on range delete being available).
pub fn delete_all<A>(cx: &(impl Write + Query), origin: Key, target: Key) -> Result<()>
where
A: Arrow,
{
let by_origin = cx.open(A::SPEC.by_origin);
let by_target = cx.open(A::SPEC.by_target);
Ok(if A::Kind::IS_MULTI {
// TODO: optimize this implementation using range deletes.
for key in by_origin.scan(origin.fuse(target)).keys() {
let key = Multi::decode(key?.as_ref());
by_origin.del(key.encode())?;
by_target.del(key.swap().encode())?;
}
} else {
by_origin.del(origin.fuse(target))?;
by_target.del(target.fuse(origin))?;
})
}
/// Delete a specific arrow, if it exists. Doesn't error if the arrow does *not* exist.
pub fn delete_one<A>(cx: &impl Write, arrow: A::Kind) -> Result<()>
where
A: Arrow,
{
let (by_origin, by_target) = arrow.enc();
cx.open(A::SPEC.by_origin).del(by_origin)?;
cx.open(A::SPEC.by_target).del(by_target)?;
OK
}
}
/// A directed edge between two vertices.
pub trait Arrow {
type Label: Encode + Decode = ();
const SPACE: (Space, Space);
}
/// Which way an arrow is pointing when viewed from a particular vertex.
pub enum Direction {
Incoming,
Outgoing,
}
/// The node this arrow points away from is the "author" of the node the arrow points to.
pub struct AuthorOf;
impl Arrow for AuthorOf {
const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r"));
}
/// The origin of this arrow has follow requested the target.
pub struct FollowRequested;
impl Arrow for FollowRequested {
const SPACE: (Space, Space) = (Space("pending-fr/l"), Space("pending-fr/r"));
}
/// The origin "follows" the target.
pub struct Follows;
impl Arrow for Follows {
const SPACE: (Space, Space) = (Space("follows/l"), Space("follows/r"));
/// Types representing the different kinds of arrows.
mod kinds {
use super::ArrowKind;
use crate::Key;
impl ArrowKind for Multi {
const IS_MULTI: bool = true;
fn dec(buf: &[u8]) -> Self {
Multi::decode(buf)
}
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
(self.encode(), self.swap().encode())
}
}
impl ArrowKind for Basic {
const IS_MULTI: bool = false;
fn dec(buf: &[u8]) -> Self {
Basic::decode(buf)
}
fn enc(self) -> (impl AsRef<[u8]>, impl AsRef<[u8]>) {
(self.encode(), self.reverse().encode())
}
}
/// A multi-edge is an edge with an identity, which allows multiple parallel edges to exist
/// between two vertices.
#[derive(Clone, Copy)]
pub struct Multi {
/// The node that this edge points away from.
pub origin: Key,
/// The node that this edge points towards.
pub target: Key,
/// The discriminator of this particular edge, which distinguishes it from all other edges that
/// connect `origin` and `target`, and indeed from every other edge or node in the graph.
pub identity: Key,
}
impl Multi {
/// Decode a multiarrow key from an origin-first encoded buffer. If the buffer is not correctly
/// oriented, the results will be wrong; the arrow will be oriented *away* from the target and
/// *at* the origin, instead of the other way around.
///
/// # Orientation
///
/// In this context, *correctly oriented* means that it is laid out in *origin-first* order,
/// like this (where `o`, `t` and `i` represent bytes):
///
/// ```text
/// oooooooooooooooo tttttttttttttttt iiiiiiiiiiiiiiii
/// |--------------| |--------------| |--------------|
/// origin target identity
/// ..16 16..32 32..
/// ```
///
/// In a *reverse oriented* buffer, the origin and target parts are swapped, so the target is
/// the prefix, followed by the origin, and then the identity. This is also called *target-first*
/// encoding in this documentation.
///
/// # Silent failure
///
/// There is no way to detect whether the ordering is correct from just the buffer, so the caller
/// must ensure that the order is correct. If you have a target-first encoded buffer, you can have
/// to swap the two keys before passing it into this function, or this function will give you an
/// edge that does not exist (since a multiedge can only point in one direction).
///
/// Safety-wise, this isn't an issue, so it does not warrant marking this function as `unsafe`.
///
/// # Panics
///
/// This function panics if `buf` is not exactly 48 bytes long.
pub fn decode(buf: &[u8]) -> Multi {
Multi {
origin: Key::from_slice(&buf[..16]),
target: Key::from_slice(&buf[16..32]),
identity: Key::from_slice(&buf[32..]),
}
}
/// Encode an arrow in *origin-first order*. See the docs of [`Multi::decode`] for an explanation
/// of the difference between origin-first encoding and target-first encoding.
pub fn encode(self) -> [u8; 48] {
let mut key = [0; 48];
key[..16].copy_from_slice(&self.origin.0);
key[16..32].copy_from_slice(&self.target.0);
key[32..].copy_from_slice(&self.identity.0);
key
}
/// Swap the origin and target of this arrow, while leaving the identity the same.
pub(super) fn swap(self) -> Multi {
Multi {
origin: self.target,
target: self.origin,
..self
}
}
}
/// A normal directed edge. Duplicates are not allowed.
///
/// This kind of arrow is useful for modeling predicates and simple relationships.
#[derive(Clone, Copy)]
pub struct Basic {
pub origin: Key,
pub target: Key,
}
impl Basic {
/// Get the inverse of this arrow (an arrow that connects the same two nodes, but pointing in the
/// other direction).
pub fn reverse(self) -> Basic {
Basic {
origin: self.target,
target: self.origin,
}
}
/// Encode `self` in origin-first order. See [`Multi::decode`] for docs on ordering.
pub fn encode(self) -> [u8; 32] {
self.origin.fuse(self.target)
}
/// Decode a basic edge from a buffer laid out origin-first. See [`Multi::decode`] for more information
/// about key encoding.
///
/// # Panics
///
/// Panics if `buf` is not exactly 32 bytes long.
pub fn decode(buf: &[u8]) -> Basic {
let (origin, target) = Key::split(buf);
Basic { origin, target }
}
}
}
/// Derive [`Arrow`] for a struct.
///
/// This will generate the required [`Into`] and [`From`] impls, as well as an [`Arrow`](trait@Arrow) impl and
/// a [`Value`] impl with the namespaces derived from the name of the struct. The macro works on structs with
/// specific fields, or newtypes of any arrow kind.
///
/// # Attributes
///
/// The `origin`, `target` and `identity` attributes are used on fields of type [`Key`], and they are used
/// to map the arrow's type to an [`ArrowKind`]. The `#[origin]` annotation isn't needed if the struct contains
/// a field named `origin`. Ditto with `target` and `identity`.
///
/// If there is no `identity` defined, the `ArrowKind` will be [`Basic`]. If an `identity` is defined, the kind
/// will be [`Multi`].
///
/// # Examples
///
/// Generates a [`Basic`] arrow called `my-arrow`.
///
/// ```
/// use store::{Key, Arrow, types::Schema};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// // This will fail to compile if the type doesn't implement `Arrow` correctly
/// Schema::new().has::<MyArrow>();
/// ```
///
/// Newtypes of either arrow kind are supported.
///
/// ```
/// use store::arrow::{Basic, Multi, Arrow};
///
/// /// The origin has requested to follow the target.
/// ///
/// /// Note: there may be more than one follow request between any two actors.
/// #[derive(Arrow)]
/// struct FollowRequest(Multi);
///
/// /// A relation between two actors meaning that the origin follows the target.
/// #[derive(Arrow)]
/// struct Follows(Basic);
///
/// /// Users can follow each other.
/// struct User(Key);
///
/// impl User {
/// /// Make `self` follow `other`.
/// pub fn follows(self, other: User) -> Follows {
/// Follows(Basic { origin: self.0, target: other.0 })
/// }
/// }
/// ```
///
/// Generates a [`Multi`] arrow called `my-multi-arrow`, mapping the multiarrow's discriminator to the struct's
/// `unique` field.
///
/// ```
/// use store::{Key, Arrow};
///
/// #[derive(Arrow)]
/// struct MyMultiArrow {
/// pub origin: Key,
/// pub target: Key,
/// #[identity]
/// pub unique: Key,
/// }
/// ```
///
/// The macro automatically adds `From` and `Into` implementations:
///
/// ```
/// use store::{Key, Arrow, arrow::Basic};
///
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// let edge: Basic = MyArrow { origin, target }.into();
///
/// assert_eq!(origin, edge.origin);
/// assert_eq!(target, edge.target);
/// ```
pub use r#macro::Arrow;

239
lib/store/src/internal.rs Normal file
View file

@ -0,0 +1,239 @@
//! Provides a nice hashmap-esque interface for manipulating entries in the store's backend.
use std::sync::Arc;
use rocksdb::{BoundColumnFamily, IteratorMode};
pub use self::cx::{Context, Query, Write};
use crate::{util::IterExt as _, Error, Result};
/// An internal interface to a specific keyspace that exposes basic hashmap-esque operations
/// on that keyspace, generic over whether the source of the data is a [`Transaction`] or a
/// [`Store`].
pub struct Keyspace<'db, C> {
pub(super) context: &'db C,
pub(super) cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db, C> Keyspace<'db, C>
where
C: Query,
{
/// Fetch a value from the keyspace.
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<impl AsRef<[u8]> + 'db>> {
self.context.get_pinned(&self.cf, key)
}
/// Test whether a key exists.
pub fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.get(key).map(|r| r.is_some())
}
/// Execute a prefix scan.
pub fn scan(
&self,
prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
let t = prefix.as_ref().to_vec();
self.context
.prefix_iterator(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map_err(Error::Internal)
}
/// List all pairs in the keyspace.
pub fn list(&self) -> impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>> + 'db {
self.context
.full_iterator(&self.cf, IteratorMode::Start)
.map_err(Error::Internal)
}
}
impl<C> Keyspace<'_, C>
where
C: Write,
{
/// Set the given `key` to the `value`, overwriting it if there was already a value there.
pub fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.context.put(&self.cf, key, val)
}
/// Drop the value if it exists.
pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.context.delete(&self.cf, key)
}
}
mod cx {
//! Contexts for doing reads, writes or both to the database.
//!
//! The traits in this module map abstract calls to their methods on the [rocksdb] objects.
use rocksdb::{
AsColumnFamilyRef, DBAccess, DBIteratorWithThreadMode, DBPinnableSlice, IteratorMode,
};
use super::Keyspace;
use crate::{Backend, Batch, Error, Result, Store, Transaction, OK};
/// A context for executing database operations.
pub trait Context {
/// Open the keyspace identified by `cf`.
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self>
where
Self: Sized;
}
/// A context in which one can read from the data store.
///
/// Specifically, this maps calls to either the transaction or the store's internals without us having
/// to implement methods for *both* transactions and the store.
pub trait Query: Context {
type Backend: DBAccess;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>>;
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend>;
}
/// A context in which one can read from and modify the data store.
pub trait Write: Context {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()>;
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()>;
}
impl Context for Store {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self> {
Keyspace {
cf: self.inner.cf_handle(cf.as_ref()).unwrap(),
context: &self,
}
}
}
impl Query for Store {
type Backend = Backend;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>> {
self.inner.get_pinned_cf(cf, key).map_err(Error::Internal)
}
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Backend> {
self.inner.prefix_iterator_cf(cf, prefix)
}
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
}
impl Context for Transaction<'_> {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self> {
Keyspace {
cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(),
context: &self,
}
}
}
impl<'db> Query for Transaction<'db> {
type Backend = rocksdb::Transaction<'db, Backend>;
fn get_pinned<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
) -> Result<Option<DBPinnableSlice<'a>>> {
self.inner.get_pinned_cf(cf, key).map_err(Error::Internal)
}
fn prefix_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
prefix: &[u8],
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.prefix_iterator_cf(cf, prefix)
}
fn full_iterator<'a>(
&'a self,
cf: &impl AsColumnFamilyRef,
mode: IteratorMode<'a>,
) -> DBIteratorWithThreadMode<'a, Self::Backend> {
self.inner.full_iterator_cf(cf, mode)
}
}
impl Write for Transaction<'_> {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> {
self.inner.delete_cf(cf, key).map_err(Error::Internal)
}
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
self.inner.put_cf(cf, key, val).map_err(Error::Internal)
}
}
impl Context for Batch {
fn open<'cx>(&'cx self, cf: impl AsRef<str>) -> Keyspace<'cx, Self>
where
Self: Sized,
{
Keyspace {
cf: self.store.inner.cf_handle(cf.as_ref()).unwrap(),
context: &self,
}
}
}
impl Write for Batch {
fn delete(&self, cf: &impl AsColumnFamilyRef, key: impl AsRef<[u8]>) -> Result<()> {
self.inner.borrow_mut().delete_cf(cf, key);
OK
}
fn put(
&self,
cf: &impl AsColumnFamilyRef,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> Result<()> {
self.inner.borrow_mut().put_cf(cf, key, val);
OK
}
}
}

View file

@ -3,8 +3,6 @@ use std::fmt::{Debug, Display};
use chrono::{DateTime, Utc};
use ulid::Ulid;
use crate::{Alias, Error, Result, Transaction};
/// A unique identifier for vertices in the database.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Key(pub(crate) [u8; 16]);
@ -31,7 +29,7 @@ impl Key {
Key(key)
}
pub fn timestamp(self) -> DateTime<Utc> {
let ms = Ulid::from_bytes(self.0).timestamp_ms();
let ms = self.to_ulid().timestamp_ms();
DateTime::from_timestamp_millis(ms as i64).unwrap()
}
/// Join two keys together.
@ -46,6 +44,9 @@ impl Key {
let head = Key::from_slice(&buf[16..]);
(tail, head)
}
fn to_ulid(self) -> Ulid {
Ulid::from_bytes(self.0)
}
}
impl AsRef<[u8]> for Key {
@ -53,46 +54,3 @@ impl AsRef<[u8]> for Key {
&self.0
}
}
/// Anything that can be used to reference a vertex, both "normal" [keys](Key)
/// and [aliases](Alias).
///
/// In general, using a key directly is going to be more efficient than using
/// an alias, because it incurs less lookups.
pub trait Keylike: Sized {
/// Translate the thing to a [`Key`].
///
/// This function should return [`Error::Missing`] if the key cannot be located.
fn translate(self, tx: &Transaction<'_>) -> Result<Key>;
/// Translate, and check whether the key is actually registered.
///
/// This function should return [`Error::Undefined`] if the key does not *really*
/// exist. It should return [`Error::Missing`] if the key can't be found.
fn checked_translate(self, tx: &Transaction<'_>) -> Result<Key> {
let key = self.translate(tx)?;
if !tx.is_registered(key)? {
Err(Error::Undefined { key })
} else {
Ok(key)
}
}
}
impl Keylike for Key {
fn translate(self, _: &Transaction<'_>) -> Result<Key> {
Ok(self)
}
}
impl<A> Keylike for A
where
A: Alias,
{
fn translate(self, tx: &Transaction<'_>) -> Result<Key> {
tx.lookup_alias(self)
}
}
/// A type tag identifying a vertex.
#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
pub struct Tag(pub u8);

File diff suppressed because it is too large Load diff

View file

@ -1,35 +1,147 @@
//! Modules of information.
use bincode::{Decode, Encode};
/// Derive a [`Mixin`] implementation.
pub use r#macro::Mixin;
use crate::Space;
use super::{
types::{MixinSpec, Value},
Batch, Store, Transaction,
};
use crate::{Error, Key, Result};
/// A simple piece of data associated with a vertex.
pub trait Mixin: Encode + Decode {
const SPACE: Space;
/// Mixins are the simplest pieces of data in the store.
pub trait Mixin: Value<Type = MixinSpec> + Encode + Decode {}
impl Store {
/// Get the value!
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
where
M: Mixin,
{
op::get_mixin(self, node)
}
/// Check if `node` has a mixin `M`.
pub fn has_mixin<M>(&self, node: Key) -> Result<bool>
where
M: Mixin,
{
op::has_mixin::<M>(self, node)
}
}
/// Information needed to render a social media profile.
#[derive(Encode, Decode)]
pub struct Profile {
pub post_count: usize,
pub account_name: String,
pub display_name: Option<String>,
pub about_string: Option<String>,
pub about_fields: Vec<(String, String)>,
impl Transaction<'_> {
/// Apply an update function to the mixin `M` of `node`.
///
/// # Errors
///
/// - [`Error::Missing`]: if `node` does not have a mixin of this type.
///
/// [`Error::Missing`]: crate::Error::Missing
pub fn update<M>(&self, node: Key, update: impl FnOnce(M) -> M) -> Result<()>
where
M: Mixin,
{
op::update(self, node, update)
}
/// Get the mixin of the specified type associated with `node`.
pub fn get_mixin<M>(&self, node: Key) -> Result<Option<M>>
where
M: Mixin,
{
op::get_mixin(self, node)
}
/// Add a mixin to `node`.
///
/// # Errors
///
/// - [`Error::Conflict`]: if `node` already has a mixin of type `M`.
///
/// [`Error::Conflict`]: crate::Error::Missing
pub fn add_mixin<M>(&self, node: Key, mixin: M) -> Result<()>
where
M: Mixin,
{
if op::has_mixin::<M>(self, node)? {
return Err(Error::Conflict);
} else {
op::add_mixin::<M>(self, node, mixin)
}
}
/// Check whether `node` has an `M` defined for it.
pub fn has_mixin<M>(&self, node: Key) -> Result<bool>
where
M: Mixin,
{
op::has_mixin::<M>(self, node)
}
}
impl Mixin for Profile {
const SPACE: Space = Space("profile");
impl Batch {
/// Add a mixin to the `node`.
///
/// **Note**: unlike [`Transaction::add_mixin`], this will *not* return an error if the key already has a mixin
/// of this type. This *should* not cause inconsistency.
pub fn put_mixin<M>(&mut self, node: Key, mixin: M)
where
M: Mixin,
{
op::add_mixin(self, node, mixin).unwrap()
}
}
/// Contents of a post.
#[derive(Encode, Decode)]
pub struct Content {
pub content: Option<String>,
pub summary: Option<String>,
}
mod op {
use super::Mixin;
use crate::{internal::*, Error, Key, Result};
impl Mixin for Content {
const SPACE: Space = Space("content");
pub fn update<M>(
cx: &(impl Query + Write),
node: Key,
update: impl FnOnce(M) -> M,
) -> Result<()>
where
M: Mixin,
{
// TODO: implement in terms of a merge operator instead of separate query and write ops.
// this would let us remove the `Query` bound, which would in turn let us update from within
// a batch.
//
// See https://github.com/facebook/rocksdb/wiki/Merge-Operator
//
// It looks like rocksdb allows you to specify a merge operator per column family.[^1]
// This means we can construct our column families with a merge operator that knows how to encode and decode mixins.
//
// [^1]: https://github.com/facebook/rocksdb/blob/9d37408f9af15c7a1ae42f9b94d06b27d98a011a/include/rocksdb/options.h#L128
let tree = cx.open(M::SPEC.keyspace);
match tree.get(node.as_ref())? {
None => Err(Error::Missing),
Some(buf) => {
let new = decode(buf).map(update).and_then(encode)?;
tree.set(node, new)
}
}
}
pub fn get_mixin<M: Mixin>(cx: &impl Query, node: Key) -> Result<Option<M>> {
cx.open(M::SPEC.keyspace).get(node)?.map(decode).transpose()
}
pub fn add_mixin<M: Mixin>(cx: &impl Write, node: Key, mixin: M) -> Result<()> {
cx.open(M::SPEC.keyspace).set(node, encode(mixin)?)
}
pub fn has_mixin<M: Mixin>(cx: &impl Query, node: Key) -> Result<bool> {
cx.open(M::SPEC.keyspace).has(node)
}
pub(super) fn encode(data: impl bincode::Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
fn decode<T>(data: impl AsRef<[u8]>) -> Result<T>
where
T: bincode::Decode,
{
bincode::decode_from_slice(data.as_ref(), bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}
}

View file

@ -1,437 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use bincode::{Decode, Encode};
use rocksdb::{BoundColumnFamily, IteratorMode};
use crate::{
arrow::Direction, key::Tag, util::IterExt as _, Alias, Arrow, Backend, Error, Key, Keylike,
Mixin, Result, Store, OK, SPACES,
};
impl Store {
/// Initiate a transaction.
///
/// If the result is an error, the transaction is rolled back, and otherwise the transaction
/// is committed.
pub fn transaction<T>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T>) -> Result<T> {
// Load all the column family handles, because they can't be accessed through the
// `rocksdb::Transaction` struct, only the `TransactionDB`.
let spaces = SPACES
.into_iter()
.map(|name| (*name, self.inner.cf_handle(name).unwrap()))
.collect();
let tx = Transaction {
inner: self.inner.transaction(),
spaces,
};
let result = f(&tx);
if result.is_err() {
tx.inner.rollback()?;
} else {
tx.inner.commit()?;
}
result
}
/// Check whether a key exists in the registry,
pub fn is_registered(&self, key: Key) -> Result<bool> {
let cf = self
.inner
.cf_handle("registry")
.expect("failed to open registry");
self.inner
.get_pinned_cf(&cf, key)
.map(|opt| opt.is_some())
.map_err(Error::Internal)
}
}
/// A database transaction, in which either each action succeeds, or everything fails
/// together.
///
/// The transaction struct is the interface for quering and manipulating persisted content.
pub struct Transaction<'db> {
inner: rocksdb::Transaction<'db, Backend>,
spaces: HashMap<&'static str, Arc<BoundColumnFamily<'db>>>,
}
/// Methods for manipulating the registry.
///
/// Before you can manipulate a vertex, its needs to be registered.
impl Transaction<'_> {
/// Register a new vertex.
pub fn create_vertex(&self, key: Key, tag: Tag) -> Result<()> {
self.with("registry").set(key, [tag.0])
}
/// Delete a vertex from the registry.
pub fn delete_vertex(&self, key: Key) -> Result<()> {
// TODO: also make this delete all related data?
self.with("registry").del(key)
}
/// Check whether a vertex is registered in the database.
pub fn is_registered(&self, key: Key) -> Result<bool> {
self.with("registry").has(key)
}
}
/// Methods for manipulating mixins.
///
/// For each implementor of [`Mixin`], a vertex can have at most one record of that type
/// associated with it.
impl Transaction<'_> {
/// Query the store for a value associated with the vertex `key` identifies.
///
/// Using a [`Key`] is more efficient than using an alias.
pub fn lookup<M>(&self, key: impl Keylike) -> Result<(Key, M)>
where
M: Mixin,
{
// Checked translate isn't needed, we'll complain if we can't find the data.
let canonicalized_key = key.translate(&self)?;
let raw = self.with(M::SPACE).get(canonicalized_key)?;
let value = decode(raw.as_ref())?;
Ok((canonicalized_key, value))
}
/// Associate a new mixin value with the key.
///
/// # Errors
///
/// - `Error::Conflict` if a mixin of this type is already associated with the vertex
/// - `Error::Undefined` if `key` is not in the registry.
pub fn insert<M>(&self, key: impl Keylike, data: M) -> Result<()>
where
M: Mixin,
{
let key = key.checked_translate(&self)?;
let data = encode(data)?;
let ns = self.with(M::SPACE);
// Check for conflicts. Fail if the key already exists, otherwise set the key
// to the given value.
if ns.has(key)? {
Err(Error::Conflict)
} else {
ns.set(key, data)
}
}
/// Apply an update function to the mixin identified by the key.
///
/// # Errors
///
/// - `Error::Undefined` if the `key` is not registered
/// - `Error::Missing` if `key` does not exist in the keyspace associated with `M`
pub fn update<M>(&self, key: impl Keylike, f: impl FnOnce(Key, M) -> Result<M>) -> Result<()>
where
M: Mixin,
{
let key = key.checked_translate(self)?;
let (key, old) = self.lookup::<M>(key)?;
let new = f(key, old).and_then(encode)?;
self.with(M::SPACE).set(key, new)
}
/// Remove the mixin from the vertex `key` refers to.
///
/// Doesn't complain if the value does not exist in the expected keyspace.
pub fn remove<M>(&self, key: impl Keylike) -> Result<Option<M>>
where
M: Mixin,
{
// Checked translate isn't needed because we don't care if the key is bogus.
let canonical_key = key.translate(self)?;
let ns = self.with(M::SPACE);
match ns.pop(canonical_key) {
Ok(Some(val)) => decode(&val).map(Some),
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
/// List all key-value pairs for mixins of type `M`.
pub fn list<M>(&self) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
self.with(M::SPACE).list().bind_results(|(k, v)| {
let v = decode(v.as_ref())?;
let k = Key::from_slice(k.as_ref());
Ok((k, v))
})
}
}
/// Methods for interacting with [aliases][Alias], which are unique alternate keys.
impl Transaction<'_> {
/// Look up the key that the given alias maps to.
///
/// If the key was deleted, but the alias wasn't properly cleaned up,
pub fn lookup_alias<A>(&self, alias: A) -> Result<Key>
where
A: Alias,
{
let (l, _) = A::SPACE;
let raw = self.with(l).get(alias.to_string())?;
Ok(Key::from_slice(raw.as_ref()))
}
/// Create a new alias of type `A` for the given [`Key`].
///
/// If the alias already exists, this function returns `Conflict`.
pub fn insert_alias<A>(&self, key: Key, alias: A) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
let alias = alias.to_string();
if self.with(l).has(&alias)? {
return Err(Error::Conflict);
}
self.with(l).set(&alias, key)?;
self.with(r).set(key, &alias)?;
OK
}
/// Delete the alias of type `A` that points to `key`.
pub fn remove_alias<A>(&self, key: Key) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
// First, pop the reverse mapping, which will give us the encoded
// key for the normal mapping. If it doesn't exist, don't delete
// the normal mapping.
if let Some(alias) = self.with(r).pop(key)? {
self.with(l).pop(alias)?;
}
OK
}
}
impl Transaction<'_> {
/// Find an arrow of type `A` with the given `tail` and `head`.
pub fn lookup_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<Option<A::Label>>
where
A: Arrow,
{
let (l, _) = A::SPACE;
match self.with(l).get(tail.fuse(head)) {
Ok(raw) => decode(raw.as_ref()).map(Some),
Err(Error::Missing) => Ok(None),
Err(err) => Err(err),
}
}
/// Create a simple arrow of type `A`.
///
/// # Errors
///
/// - `Error::Undefined` if either key is not registered
pub fn insert_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<()>
where
A: Arrow<Label = ()>,
{
if !self.is_registered(tail)? {
return Err(Error::Undefined { key: tail });
}
if !self.is_registered(head)? {
return Err(Error::Undefined { key: head });
}
let (l, r) = A::SPACE;
self.with(l).set(tail.fuse(head), b"")?;
self.with(r).set(head.fuse(tail), b"")?;
OK
}
/// Delete an arrow from the data store.
pub fn remove_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<()>
where
A: Arrow,
{
self.with(A::SPACE.0).del(tail.fuse(head))?;
self.with(A::SPACE.1).del(head.fuse(tail))?;
OK
}
/// Check whether an arrow exists.
pub fn exists<A>(&self, (tail, head): (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.with(A::SPACE.0).has(tail.fuse(head))
}
/// Get all arrows of type `A` "pointing at" `key`.
pub fn list_incoming<A>(
&self,
key: impl Keylike,
) -> impl Iterator<Item = Result<(Key, A::Label)>> + '_
where
A: Arrow,
{
self.list_arrows_where::<A>(Direction::Incoming, key)
}
/// Get all arrows of type `A` "pointing away from" `key`.
pub fn list_outgoing<A>(
&self,
key: impl Keylike,
) -> impl Iterator<Item = Result<(Key, A::Label)>> + '_
where
A: Arrow,
{
self.list_arrows_where::<A>(Direction::Outgoing, key)
}
/// Get all arrows of type `A`.
pub fn list_arrows<A>(&self) -> impl Iterator<Item = Result<(Key, A::Label, Key)>> + '_
where
A: Arrow,
{
self.with(A::SPACE.0).list().bind_results(|(k, v)| {
let (tail, head) = Key::split(k.as_ref());
decode(v.as_ref()).map(|label| (tail, label, head))
})
}
/// Select arrows with the given direction relative to the given key.
fn list_arrows_where<A>(
&self,
direction: Direction,
key: impl Keylike,
) -> impl Iterator<Item = Result<(Key, A::Label)>> + '_
where
A: Arrow,
{
// Keys in space 0 are arranged with the tail at the start, and the ones in space 1
// are arranged with the head at the start. This allows us to efficiently prefix scan
// regardless of the direction, at the cost of increased space usage.
let space = match direction {
Direction::Outgoing => A::SPACE.0,
Direction::Incoming => A::SPACE.1,
};
let key = key.translate(&self).unwrap();
#[cfg(test)]
eprintln!("scanning {} using prefix {key}", space.0);
self.with(space).scan(key).bind_results(|(k, v)| {
// Because we're prefix scanning on the first half of the key, we only want to
// get the second here.
let (_this, other) = Key::split(k.as_ref());
#[cfg(test)]
eprintln!(" found {_this}:{other}");
decode(v.as_ref()).map(|label| (other, label))
})
}
pub(crate) fn quiver(&self, tag: Tag) -> Quiver<'_> {
Quiver { tag, tx: &self }
}
}
impl Transaction<'_> {
/// Use a keyspace.
fn with(&self, name: impl AsRef<str>) -> Keyspace<'_> {
Keyspace {
cf: self.spaces[name.as_ref()].clone(),
tx: &self,
}
}
}
/// Provides the basic API for a keyspace/column family.
struct Keyspace<'db> {
tx: &'db Transaction<'db>,
cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db> Keyspace<'db> {
/// Retrieve a value from the database. Returns `Missing` if the key does not exist.
fn get(&self, key: impl AsRef<[u8]>) -> Result<impl AsRef<[u8]> + 'db> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.and_then(|opt| opt.ok_or(Error::Missing))
}
/// Set the value at `key` to `val`.
fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.tx
.inner
.put_cf(&self.cf, key, val)
.map_err(Error::Internal)
}
/// Delete the key-value pair identified by `key`.
fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.tx.inner.delete_cf(&self.cf, &key)?;
OK
}
/// Remove the key and associated value from the keyspace, and return its previous value.
fn pop(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
let old = self.tx.inner.get_for_update_cf(&self.cf, &key, true)?;
self.del(key)?;
Ok(old)
}
/// Check whether the key exists in the keyspace.
fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.map(|opt| opt.is_some())
}
/// Execute a prefix scan.
fn scan(
&self,
prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
let t = prefix.as_ref().to_vec();
self.tx
.inner
.prefix_iterator_cf(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map_err(Error::Internal)
}
/// Show all items in the entire keyspace.
fn list(
&self,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
self.tx
.inner
.full_iterator_cf(&self.cf, IteratorMode::Start)
.map_err(Error::Internal)
}
}
/// The quiver allows one to manipulate all parallel edges tagged with a particular type.
pub struct Quiver<'db> {
tx: &'db Transaction<'db>,
tag: Tag,
}
impl<'db> Quiver<'db> {
pub fn insert(&self, origin: Key, target: Key, identity: Key) -> Result<()> {
let fused = origin.fuse(target);
self.tx.with("multi:id-map").set(identity, fused)?;
let mut triple = [0; 48];
triple[..32].copy_from_slice(&fused);
triple[32..].copy_from_slice(identity.as_ref());
self.tx.with("multi:index/l").set(triple, b"")?;
triple[..32].rotate_left(16);
self.tx.with("multi:index/r").set(triple, b"")?;
OK
}
pub fn list_incoming(&self, target: Key) -> impl Iterator<Item = Result<(Key, Key)>> + 'db {
self.tx
.with("multi:index/r")
.scan(target)
.map_ok(|(k, _)| Key::split(&k.as_ref()[16..]))
}
}
fn encode(data: impl Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
fn decode<T>(data: &[u8]) -> Result<T>
where
T: Decode,
{
bincode::decode_from_slice(data, bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}
#[cfg(test)]
mod tests;

View file

@ -1,256 +0,0 @@
use super::*;
use crate::Space;
#[derive(Encode, Decode)]
struct TestArrow;
impl Arrow for TestArrow {
const SPACE: (Space, Space) = (Space("test-arrow/l"), Space("test-arrow/r"));
}
const TEST_TAG: Tag = Tag(69);
macro_rules! keygen {
{ $($name:ident)* } => {
$(
let $name = Key::gen();
eprintln!(concat!(stringify!($name), "={}"), $name);
)*
}
}
fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) -> Result<()> {
Store::with_tmp(|db| {
// Run these tests 128 times because misuse of prefix iterator may cause weird,
// obscure bugs :3
//
// Also, because we don't wipe the store between test runs, we have more chances
// to discover weird bugs that we wouldn't catch if there was only a single run.
Ok(for n in 0..128 {
eprintln!("--- run {n} ---");
db.transaction(|tx| {
keygen!(target origin);
tx.create_vertex(target, TEST_TAG)?;
tx.create_vertex(origin, TEST_TAG)?;
tx.insert_arrow::<TestArrow>((origin, target))?;
let l: Vec<String> = tx
.with("test-arrow/l")
.list()
.map_ok(|(k, _)| Key::split(k.as_ref()))
.map_ok(|(a, b)| format!("({a}, {b})"))
.try_collect()?;
eprintln!("test-arrow/l = {l:#?}");
let r: Vec<String> = tx
.with("test-arrow/r")
.list()
.map_ok(|(k, _)| Key::split(k.as_ref()))
.map_ok(|(a, b)| format!("({a}, {b})"))
.try_collect()?;
eprintln!("test-arrow/r = {r:#?}");
f(origin, target, &tx, n)
})?;
eprintln!("--- end run {n} ---");
})
})
}
#[test]
fn target_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let ti: Vec<_> = tx.list_incoming::<TestArrow>(target).keys().try_collect()?;
eprintln!("target.incoming = {ti:#?}");
assert!(ti.contains(&origin), "origin ∈ target.incoming");
assert!(!ti.contains(&target), "target ∉ target.incoming");
OK
})
}
#[test]
fn target_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let to: Vec<_> = tx.list_outgoing::<TestArrow>(target).keys().try_collect()?;
eprintln!("target.outgoing = {to:#?}");
assert!(!to.contains(&target), "target ∉ target.outgoing");
assert!(!to.contains(&origin), "origin ∉ target.outgoing");
OK
})
}
#[test]
fn origin_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oi: Vec<_> = tx.list_incoming::<TestArrow>(origin).keys().try_collect()?;
eprintln!("origin.incoming = {oi:#?}");
assert!(!oi.contains(&origin), "origin ∉ origin.incoming");
assert!(!oi.contains(&target), "target ∉ origin.incoming");
OK
})
}
#[test]
fn origin_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(origin).keys().try_collect()?;
eprintln!("origin.outgoing = {oo:#?}");
assert!(oo.contains(&target), "target ∈ origin.outgoing");
assert!(!oo.contains(&origin), "origin ∉ origin.outgoing");
OK
})
}
#[test]
fn fanout() -> Result<()> {
let targets: [Key; 128] = std::array::from_fn(|_| Key::gen());
let origin = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.create_vertex(origin, TEST_TAG)?;
for t in targets {
tx.create_vertex(t, TEST_TAG)?;
tx.insert_arrow::<TestArrow>((origin, t))?;
}
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(origin).keys().try_collect()?;
for t in targets {
assert!(oo.contains(&t), "∀ t ∈ targets: t ∈ origin.outgoing");
let ti: Vec<_> = tx.list_incoming::<TestArrow>(t).keys().try_collect()?;
assert!(
ti == vec! {origin},
"∀ t ∈ targets: t.incoming = {{origin}}"
);
}
OK
})
})
}
#[test]
fn fanin() -> Result<()> {
let origins: [Key; 128] = std::array::from_fn(|_| Key::gen());
let target = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.create_vertex(target, TEST_TAG)?;
for o in origins {
tx.create_vertex(o, TEST_TAG)?;
tx.insert_arrow::<TestArrow>((o, target))?;
}
let ti: Vec<_> = tx.list_incoming::<TestArrow>(target).keys().try_collect()?;
for o in origins {
let oo: Vec<_> = tx.list_outgoing::<TestArrow>(o).keys().try_collect()?;
assert!(ti.contains(&o), "∀ o ∈ origins: o ∈ target.incoming");
assert!(
oo == vec! {target},
"∀ o ∈ origins: o.outgoing = {{target}}"
);
}
OK
})
})
}
#[test]
fn distinct_many_to_many() -> Result<()> {
let origins: [Key; 32] = std::array::from_fn(|_| Key::gen());
let targets: [Key; 32] = std::array::from_fn(|_| Key::gen());
Store::with_tmp(|db| {
db.transaction(|tx| {
for t in targets {
tx.create_vertex(t, TEST_TAG)?;
}
for o in origins {
tx.create_vertex(o, TEST_TAG)?;
for t in targets {
tx.insert_arrow::<TestArrow>((o, t))?;
}
}
let ti: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_incoming::<TestArrow>(t)
.keys()
.try_collect()
.map(|v: Vec<_>| (t, v))
})
.collect::<Result<_>>()?;
// For each origin point, there must be a target that has it as "incoming".
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().any(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∃ t ∈ targets: o ∈ t.incoming"
);
// Each target has each origin as incoming.
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().all(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∀ t ∈ targets: o ∈ t.incoming"
);
let to: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_outgoing::<TestArrow>(t)
.keys()
.try_collect()
.map(|v: Vec<_>| (t, v))
})
.collect::<Result<_>>()?;
// Our arrows point only from origins to targets, and there's a bug if there
// exists a target such that its outgoing set is non-empty.
assert!(
!targets.into_iter().any(|t| !to[&t].is_empty()),
"∄ t ∈ targets: t.outgoing ≠ ∅"
);
let oo: HashMap<Key, Vec<Key>> = origins
.into_iter()
.map(|o| {
tx.list_outgoing::<TestArrow>(o)
.keys()
.try_collect()
.map(|v: Vec<_>| (o, v))
})
.collect::<Result<_>>()?;
// Each origin has each target as outgoing.
assert!(
origins
.into_iter()
.all(|o| targets.into_iter().all(|t| oo[&o].contains(&t))),
"∀ o ∈ origins: ∀ t ∈ targets: t ∈ o.outgoing"
);
OK
})
})
}

171
lib/store/src/types.rs Normal file
View file

@ -0,0 +1,171 @@
//! Defining a [`Schema`].
//!
//! There is a lot of complicated machinery here to make it so that you have to write very little code to
//! define new types. Basically, if you want to define a thing to store, you need to implement the trait
//! for it (e.g. [`Arrow`]), and also implement [`Value`], where you create a specification describing which
//! namespaces store records of that type.
//!
//! Then, when you construct a new `Store`, you need to pass in a [`Schema`], or the database won't be able
//! to operate on the types.
//!
//! [`Arrow`]: super::Arrow
use std::collections::HashSet;
use derive_more::Display;
/// The namespace where all vertices must be registered.
pub(crate) const NODE_HEADERS: Namespace = Namespace("header:node");
/// The namespace where multiedge identities are mapped to endpoints.
pub(crate) const MULTIEDGE_HEADERS: Namespace = Namespace("header:multiedge");
/// A specification of all user-defined namespaces.
///
/// # Example
///
/// The below example correctly defines a [basic arrow] and demonstrates its use by inserting one and then
/// testing whether it exists.
///
/// ```rust
/// use store::{ arrow::Arrow, types::Schema, Store, Key, OK };
///
/// // Each kind of value has a derive macro.
/// #[derive(Arrow)]
/// struct MyArrow { origin: Key, target: Key }
///
/// fn main () -> store::Result<()> {
/// // Here, we make sure that the namespaces used for `MyArrow` are known.
/// let schema = Schema::new()
/// .has::<MyArrow>();
///
/// let result = Store::tmp(schema, |db| {
/// let origin = Key::gen();
/// let target = Key::gen();
///
/// let mut changes = db.batch();
/// changes.create(MyArrow { origin, target });
/// db.apply(changes)?;
///
/// db.exists::<MyArrow>()
/// })?;
///
/// assert!(result);
/// OK
/// }
/// ```
///
/// [basic arrow]: crate::arrow::Basic
pub struct Schema(pub(crate) HashSet<Namespace>);
impl Schema {
/// Construct a new empty schema.
pub fn new() -> Schema {
Schema(HashSet::from_iter([NODE_HEADERS, MULTIEDGE_HEADERS]))
}
/// Add the component to the schema.
pub fn has<C>(mut self) -> Schema
where
C: Value,
{
self.add(C::SPEC);
self
}
/// Add a spec to the schema by mutable reference.
pub fn add(&mut self, spec: impl TypeSpec) -> &mut Schema {
spec.register(&mut self.0);
self
}
}
/// The name of a keyspace.
///
/// Specifically, this is the name of a RocksDB column family.
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
pub struct Namespace(pub &'static str);
impl AsRef<str> for Namespace {
fn as_ref(&self) -> &str {
self.0
}
}
/// A *thing* that is stored in the database, either an [arrow](ArrowSpec), an [alias](AliasSpec), or a
/// [mixin](MixinSpec).
///
/// All namespaces must be unique.
pub trait Value {
type Type: TypeSpec;
const SPEC: Self::Type;
}
/// The specification for an [`Arrow`](super::Arrow).
///
/// The listed namespaces must be unique among all other namespaces.
#[derive(Clone, Copy)]
pub struct ArrowSpec {
/// The keyspace where edge keys are ordered `(origin, target)`.
pub by_origin: Namespace,
/// The keyspace where edge keys are ordered `(target, origin)`.
pub by_target: Namespace,
}
#[derive(Clone, Copy)]
pub struct AliasSpec {
pub keyspace: Namespace,
pub reversed: Namespace,
}
#[derive(Clone, Copy)]
pub struct MixinSpec {
pub keyspace: Namespace,
}
/// Describes how to add a [`Value`] to a [`Schema`].
pub trait TypeSpec {
/// Register the namespaces.
fn register(&self, set: &mut HashSet<Namespace>);
}
impl TypeSpec for ArrowSpec {
fn register(&self, set: &mut HashSet<Namespace>) {
if !set.insert(self.by_origin) {
panic! {
"Duplicate found while inserting Arrow::BY_ORIGIN: {}",
self.by_origin
}
}
if !set.insert(self.by_target) {
panic! {
"Duplicate found while inserting Arrow::BY_TARGET: {}",
self.by_target
}
}
}
}
impl TypeSpec for AliasSpec {
fn register(&self, set: &mut HashSet<Namespace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Alias::KEYSPACE: {}",
self.keyspace
}
}
if !set.insert(self.reversed) {
panic! {
"Duplicate found while inserting Alias::REVERSED: {}",
self.reversed
}
}
}
}
impl TypeSpec for MixinSpec {
fn register(&self, set: &mut HashSet<Namespace>) {
if !set.insert(self.keyspace) {
panic! {
"Duplicate found while inserting Mixin::KEYSPACE: {}",
self.keyspace
}
}
}
}