Implement basic follow requests

- Also tests for the behavior of arrows in the database
- Also also, a fix for prefix scanning (i was doing it wrong
This commit is contained in:
Riley Apeldoorn 2024-04-20 23:32:01 +02:00
parent a8db282cf2
commit 21294f58ee
8 changed files with 876 additions and 60 deletions

72
Cargo.lock generated
View file

@ -26,6 +26,21 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "anstream" name = "anstream"
version = "0.6.13" version = "0.6.13"
@ -272,6 +287,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.5",
]
[[package]] [[package]]
name = "clang-sys" name = "clang-sys"
version = "1.7.0" version = "1.7.0"
@ -688,6 +717,29 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.5.0" version = "0.5.0"
@ -905,6 +957,15 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "num-traits"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.16.0" version = "1.16.0"
@ -1427,8 +1488,10 @@ name = "store"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"bincode", "bincode",
"chrono",
"derive_more", "derive_more",
"rocksdb", "rocksdb",
"tempfile",
"ulid", "ulid",
] ]
@ -1856,6 +1919,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.5",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"

View file

@ -6,3 +6,4 @@ members = [
"bin/server", "bin/server",
"bin/pupctl", "bin/pupctl",
] ]
resolver = "2"

View file

@ -1,13 +1,74 @@
use puppy::{store::alias::Username, Store}; use puppy::{
store::{
self,
alias::Username,
arrow::{FollowRequested, Follows},
Error,
},
tl::Post,
Key, Store,
};
fn main() { fn main() -> store::Result<()> {
let db = Store::open(".state").unwrap(); // Store::nuke(".state")?;
// let riley = puppy::create_author(&db, "riley").unwrap(); let db = Store::open(".state")?;
let riley = db println!("creating actors");
.transaction(|tx| tx.lookup_alias(Username("riley".to_string()))) let riley = get_or_create_actor(&db, "riley")?;
.unwrap(); let linen = get_or_create_actor(&db, "linen")?;
puppy::create_post(&db, riley, "hello!").unwrap(); if false {
for (key, post) in puppy::list_posts_by_author(&db, riley).unwrap() { println!("creating posts");
println!("post {key}: {:?} by user {riley}", post.content) puppy::create_post(&db, riley, "@linen <3")?;
puppy::create_post(&db, linen, "@riley <3")?;
}
if true {
println!("making riley follow linen");
if !db.contains::<Follows>((riley, linen))? {
println!("follow relation does not exist yet");
if !db.contains::<FollowRequested>((riley, linen))? {
println!("no pending follow request; creating");
puppy::fr::create(&db, riley, linen)?;
} else {
println!("accepting the pending follow request");
puppy::fr::accept(&db, riley, linen)?;
}
} else {
println!("riley already follows linen");
}
}
println!("Posts on the instance:");
for (key, Post { content, author }) in puppy::tl::fetch_all(&db)? {
let handle = db.reverse::<Username>(author)?;
let content = content.content.unwrap();
println!("- {key} by @{handle} ({author}):\n{content}",)
}
println!("Linen's followers:");
for id in puppy::fr::followers_of(&db, linen)? {
let handle = db.reverse::<Username>(id)?;
println!("- @{handle} ({id})");
}
println!("Riley's following:");
for id in puppy::fr::following_of(&db, riley)? {
let handle = db.reverse::<Username>(id)?;
println!("- @{handle} ({id})");
}
store::OK
}
fn get_or_create_actor(db: &Store, username: &str) -> Result<Key, Error> {
let user = db.resolve::<Username>(username);
match user {
Ok(key) => {
println!("found '{username}' ({key})");
Ok(key)
}
Err(Error::Missing) => {
println!("'{username}' doesn't exist yet, creating");
let r = puppy::create_author(&db, username);
if let Ok(ref key) = r {
println!("created '{username}' with key {key}");
}
r
}
Err(e) => Err(e),
} }
} }

View file

@ -1,14 +1,21 @@
pub use store::{self, Key, Store}; pub use store::{self, Key, Store};
use store::{ use store::{
alias::Username, alias::Username,
arrow::{AuthorOf, Direction}, arrow::AuthorOf,
value::{Content, Profile}, mixin::{Content, Profile},
Keylike, Keylike,
}; };
mod tags {
use store::Tag;
pub const AUTHOR: Tag = Tag(0);
pub const POST: Tag = Tag(1);
}
pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result<Key> { pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Result<Key> {
let key = Key::gen(); let key = Key::gen();
db.transaction(|tx| { db.transaction(|tx| {
tx.define(key, tags::POST)?;
tx.update::<Profile>(author, |_, mut profile| { tx.update::<Profile>(author, |_, mut profile| {
profile.post_count += 1; profile.post_count += 1;
Ok(profile) Ok(profile)
@ -25,6 +32,7 @@ pub fn create_post(db: &Store, author: Key, content: impl ToString) -> store::Re
pub fn create_author(db: &Store, username: impl ToString) -> store::Result<Key> { pub fn create_author(db: &Store, username: impl ToString) -> store::Result<Key> {
let key = Key::gen(); let key = Key::gen();
db.transaction(|tx| { db.transaction(|tx| {
tx.define(key, tags::AUTHOR)?;
tx.insert_alias(key, Username(username.to_string()))?; tx.insert_alias(key, Username(username.to_string()))?;
tx.insert(key, Profile { tx.insert(key, Profile {
post_count: 0, post_count: 0,
@ -42,7 +50,7 @@ pub fn list_posts_by_author(
author: impl Keylike, author: impl Keylike,
) -> store::Result<Vec<(Key, Content)>> { ) -> store::Result<Vec<(Key, Content)>> {
db.transaction(|tx| { db.transaction(|tx| {
tx.list_arrows_with::<AuthorOf>(Direction::Outgoing, author) tx.list_outgoing::<AuthorOf>(author)
.map(|r| { .map(|r| {
let (post_key, _) = r?; let (post_key, _) = r?;
tx.lookup::<Content>(post_key) tx.lookup::<Content>(post_key)
@ -50,3 +58,164 @@ pub fn list_posts_by_author(
.collect() .collect()
}) })
} }
pub mod tl {
//! Timelines
use store::{arrow::AuthorOf, mixin::Content, Key, Store};
pub struct Post {
pub author: Key,
pub content: Content,
}
pub fn fetch_all(db: &Store) -> store::Result<Vec<(Key, Post)>> {
db.transaction(|tx| {
let iter = tx.list::<Content>();
iter.map(|r| {
let (post_id, content) = r?;
let author = tx
.list_incoming::<AuthorOf>(post_id)
.map(|r| r.map(|(k, _)| k))
.next()
.unwrap()?;
Ok((post_id, Post { author, content }))
})
.collect()
})
}
}
pub mod fr {
//! Follow requests
use store::{
arrow::{FollowRequested, Follows},
Key, Store, OK,
};
pub fn create(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.insert_arrow((requester, target), FollowRequested)?;
OK
})
}
pub fn accept(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?;
tx.insert_arrow((requester, target), Follows)?;
OK
})
}
pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> {
db.transaction(|tx| {
tx.remove_arrow::<FollowRequested>((requester, target))?;
OK
})
}
pub fn list_pending(db: &Store, target: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| {
tx.list_incoming::<FollowRequested>(target)
.map(|r| r.map(|(k, _)| k))
.collect()
})
}
pub fn following_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| {
tx.list_outgoing::<Follows>(actor)
.map(|r| r.map(|(k, _)| k))
.collect()
})
}
pub fn followers_of(db: &Store, actor: Key) -> store::Result<Vec<Key>> {
db.transaction(|tx| {
tx.list_incoming::<Follows>(actor)
.map(|r| r.map(|(k, _)| k))
.collect()
})
}
#[cfg(test)]
mod tests {
use store::{
arrow::{FollowRequested, Follows},
Key, Store, OK,
};
use crate::create_author;
fn make_test_actors(db: &Store) -> store::Result<(Key, Key)> {
let alice = create_author(&db, "alice")?;
let bob = create_author(&db, "bob")?;
eprintln!("alice={alice}, bob={bob}");
Ok((alice, bob))
}
#[test]
fn create_fr() -> store::Result<()> {
Store::with_tmp(|db| {
let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?;
assert!(
db.contains::<FollowRequested>((alice, bob))?,
"(alice -> bob) ∈ follow-requested"
);
assert!(
!db.contains::<Follows>((alice, bob))?,
"(alice -> bob) ∉ follows"
);
let pending_for_bob = super::list_pending(&db, bob)?;
assert_eq!(pending_for_bob, vec![alice], "bob.pending = {{alice}}");
OK
})
}
#[test]
fn accept_fr() -> store::Result<()> {
Store::with_tmp(|db| {
let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?;
super::accept(&db, alice, bob)?;
assert!(
db.contains::<Follows>((alice, bob))?,
"(alice -> bob) ∈ follows"
);
assert!(
!db.contains::<Follows>((bob, alice))?,
"(bob -> alice) ∉ follows"
);
let pending_for_bob = super::list_pending(&db, bob)?;
assert!(pending_for_bob.is_empty(), "bob.pending = ∅");
let followers_of_bob = super::followers_of(&db, bob)?;
assert_eq!(followers_of_bob, vec![alice], "bob.followers = {{alice}}");
OK
})
}
#[test]
fn listing_follow_relations() -> store::Result<()> {
Store::with_tmp(|db| {
let (alice, bob) = make_test_actors(&db)?;
super::create(&db, alice, bob)?;
super::accept(&db, alice, bob)?;
let followers_of_bob = super::followers_of(&db, bob)?;
assert_eq!(followers_of_bob, vec![alice], "bob.followers = {{alice}}");
let following_of_alice = super::following_of(&db, alice)?;
assert_eq!(following_of_alice, vec![bob], "alice.following = {{bob}}");
OK
})
}
}
}

View file

@ -10,3 +10,5 @@ ulid = "*"
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" } rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" }
derive_more = "*" derive_more = "*"
bincode = "2.0.0-rc.3" bincode = "2.0.0-rc.3"
chrono = "*"
tempfile = "*"

View file

@ -1,14 +1,20 @@
use std::fmt::Display; use std::fmt::{Debug, Display};
use crate::{Alias, Result, Transaction}; use crate::{Alias, Error, Result, Transaction};
/// A unique identifier for vertices in the database. /// A unique identifier for vertices in the database.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Key([u8; 16]); pub struct Key([u8; 16]);
impl Display for Key { impl Display for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
ulid::Ulid::from_bytes(self.0).fmt(f) Display::fmt(&ulid::Ulid::from_bytes(self.0), f)
}
}
impl Debug for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Key({})", ulid::Ulid::from_bytes(self.0))
} }
} }
@ -46,9 +52,23 @@ impl AsRef<[u8]> for Key {
/// ///
/// In general, using a key directly is going to be more efficient than using /// In general, using a key directly is going to be more efficient than using
/// an alias, because it incurs less lookups. /// an alias, because it incurs less lookups.
pub trait Keylike { pub trait Keylike: Sized {
/// Translate the thing to a [`Key`]. /// Translate the thing to a [`Key`].
///
/// This function should return [`Error::Missing`] if the key cannot be located.
fn translate(self, tx: &Transaction<'_>) -> Result<Key>; fn translate(self, tx: &Transaction<'_>) -> Result<Key>;
/// Translate, and check whether the key is actually registered.
///
/// This function should return [`Error::Undefined`] if the key does not *really*
/// exist. It should return [`Error::Missing`] if the key can't be found.
fn checked_translate(self, tx: &Transaction<'_>) -> Result<Key> {
let key = self.translate(tx)?;
if !tx.exists(key)? {
Err(Error::Undefined { key })
} else {
Ok(key)
}
}
} }
impl Keylike for Key { impl Keylike for Key {
@ -65,3 +85,7 @@ where
tx.lookup_alias(self) tx.lookup_alias(self)
} }
} }
/// A type tag identifying a vertex.
#[derive(PartialEq, Eq, Clone, Copy, Debug, Hash)]
pub struct Tag(pub u8);

View file

@ -9,7 +9,7 @@
//! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it //! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it
//! back. //! back.
use std::sync::Arc; use std::{path::Path, sync::Arc};
use derive_more::From; use derive_more::From;
use rocksdb::{MultiThreaded, Options, TransactionDBOptions}; use rocksdb::{MultiThreaded, Options, TransactionDBOptions};
@ -19,9 +19,9 @@ type Backend = rocksdb::TransactionDB<MultiThreaded>;
mod key; mod key;
mod transaction; mod transaction;
pub use key::{Key, Keylike}; pub use key::{Key, Keylike, Tag};
pub use transaction::Transaction; pub use transaction::Transaction;
pub use {alias::Alias, arrow::Arrow, value::Value}; pub use {alias::Alias, arrow::Arrow, mixin::Mixin};
pub const OK: Result<()> = Ok(()); pub const OK: Result<()> = Ok(());
@ -36,6 +36,12 @@ const SPACES: &[&'static str] = &[
"content", "content",
"created-by/l", "created-by/l",
"created-by/r", "created-by/r",
"pending-fr/l",
"pending-fr/r",
#[cfg(test)]
"test-arrow/l",
#[cfg(test)]
"test-arrow/r",
]; ];
/// The handle to the data store. /// The handle to the data store.
@ -47,20 +53,57 @@ pub struct Store {
} }
impl Store { impl Store {
pub fn open(state_dir: &str) -> Result<Store> { pub fn open(state_dir: impl AsRef<Path>) -> Result<Store> {
let mut db_opts = Options::default(); let mut db_opts = Options::default();
db_opts.create_if_missing(true); db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true); db_opts.create_missing_column_families(true);
let tx_opts = TransactionDBOptions::default(); let tx_opts = TransactionDBOptions::default();
// NOTE: it crashes here because there hasn't been a release yet that includes https://github.com/rust-rocksdb/rust-rocksdb/pull/868
let inner = Arc::new(Backend::open_cf( let inner = Arc::new(Backend::open_cf(
&db_opts, &db_opts,
&tx_opts, &tx_opts,
format!("{state_dir}/main-store"), state_dir.as_ref().join("main-store"),
SPACES, SPACES,
)?); )?);
Ok(Store { inner }) Ok(Store { inner })
} }
/// Construct a temporary store, for testing. This store gets erased after `f` is done.
pub fn with_tmp<T, E>(f: impl FnOnce(Store) -> Result<T, E>) -> Result<T, E>
where
E: From<Error>,
{
let tmp_dir = tempfile::tempdir().expect("couldn't create tempdir");
f(Store::open(tmp_dir)?)
}
/// Delete the whole store.
///
/// **This deletes all data in the store**. Do not run this unless you want to delete all the state of the instance.
pub fn nuke(state_dir: impl AsRef<Path>) -> Result<()> {
Backend::destroy(&Options::default(), state_dir.as_ref().join("main-store"))
.map_err(Error::from)
}
/// Get the key associated with a given [alias][Alias].
pub fn resolve<A>(&self, s: impl ToString) -> Result<Key>
where
A: Alias,
{
self.transaction(|tx| tx.lookup_alias(A::from(s.to_string())))
}
/// Reverse lookup an alias.
pub fn reverse<A>(&self, key: Key) -> Result<A>
where
A: Alias,
{
self.transaction(|tx| tx.lookup_alias_rev(key))
}
/// Quickly test whether a particular [arrow][Arrow] exists.
pub fn contains<A>(&self, arrow: (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.transaction(|tx| tx.arrow_exists::<A>(arrow))
}
} }
/// An isolated keyspace. /// An isolated keyspace.
@ -73,7 +116,7 @@ impl AsRef<str> for Space {
} }
} }
pub mod value { pub mod mixin {
//! Modules of information. //! Modules of information.
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
@ -81,7 +124,7 @@ pub mod value {
use crate::Space; use crate::Space;
/// A simple piece of data associated with a vertex. /// A simple piece of data associated with a vertex.
pub trait Value: Encode + Decode { pub trait Mixin: Encode + Decode {
const SPACE: Space; const SPACE: Space;
} }
@ -94,7 +137,7 @@ pub mod value {
pub about_fields: Vec<(String, String)>, pub about_fields: Vec<(String, String)>,
} }
impl Value for Profile { impl Mixin for Profile {
const SPACE: Space = Space("profile"); const SPACE: Space = Space("profile");
} }
@ -104,7 +147,7 @@ pub mod value {
pub summary: Option<String>, pub summary: Option<String>,
} }
impl Value for Content { impl Mixin for Content {
const SPACE: Space = Space("content"); const SPACE: Space = Space("content");
} }
} }
@ -133,6 +176,20 @@ pub mod arrow {
impl Arrow for AuthorOf { impl Arrow for AuthorOf {
const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r")); const SPACE: (Space, Space) = (Space("created-by/l"), Space("created-by/r"));
} }
#[derive(Encode, Decode)]
pub struct FollowRequested;
impl Arrow for FollowRequested {
const SPACE: (Space, Space) = (Space("pending-fr/l"), Space("pending-fr/r"));
}
#[derive(Encode, Decode)]
pub struct Follows;
impl Arrow for Follows {
const SPACE: (Space, Space) = (Space("follows/l"), Space("follows/r"));
}
} }
pub mod alias { pub mod alias {
@ -161,7 +218,15 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors from the data store. /// Errors from the data store.
#[derive(From, Debug)] #[derive(From, Debug)]
pub enum Error { pub enum Error {
/// The requested value was expected to exist in a particular keyspace, but does not actually
/// exist there. This can occur on updates for example.
Missing, Missing,
/// One or more operand keys are not registered in the database.
Undefined {
key: Key,
},
/// Returned if there is a conflict; for example, if the uniqueness property of an alias would
/// be violated by inserting one.
Conflict, Conflict,
Internal(rocksdb::Error), Internal(rocksdb::Error),
Encoding(bincode::error::EncodeError), Encoding(bincode::error::EncodeError),

View file

@ -1,10 +1,11 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use rocksdb::BoundColumnFamily; use rocksdb::{BoundColumnFamily, IteratorMode};
use crate::{ use crate::{
arrow::Direction, Alias, Arrow, Backend, Error, Key, Keylike, Result, Store, Value, OK, SPACES, arrow::Direction, key::Tag, Alias, Arrow, Backend, Error, Key, Keylike, Mixin, Result, Store,
OK, SPACES,
}; };
impl Store { impl Store {
@ -31,6 +32,17 @@ impl Store {
} }
result result
} }
/// Check whether a key exists in the registry,
pub fn exists(&self, key: Key) -> Result<bool> {
let cf = self
.inner
.cf_handle("registry")
.expect("failed to open registry");
self.inner
.get_pinned_cf(&cf, key)
.map(|opt| opt.is_some())
.map_err(Error::from)
}
} }
/// A database transaction, in which either each action succeeds, or everything fails /// A database transaction, in which either each action succeeds, or everything fails
@ -42,35 +54,56 @@ pub struct Transaction<'db> {
spaces: HashMap<&'static str, Arc<BoundColumnFamily<'db>>>, spaces: HashMap<&'static str, Arc<BoundColumnFamily<'db>>>,
} }
/// Methods for manipulating the registry.
///
/// Before you can manipulate a vertex, its needs to be registered.
impl Transaction<'_> { impl Transaction<'_> {
/// Register a new key. /// Register a new vertex.
pub fn define(&self, key: Key) -> Result<()> { pub fn define(&self, key: Key, tag: Tag) -> Result<()> {
self.with("registry").set(key, b"")?; self.with("registry").set(key, [tag.0])
OK
} }
/// Delete a vertex from the registry.
pub fn delete(&self, key: Key) -> Result<()> {
// TODO: also make this delete all related data?
self.with("registry").del(key)
}
/// Check whether a vertex is registered in the database.
pub fn exists(&self, key: Key) -> Result<bool> {
self.with("registry").has(key)
}
}
/// Methods for manipulating mixins.
///
/// For each implementor of [`Mixin`], a vertex can have at most one record of that type
/// associated with it.
impl Transaction<'_> {
/// Query the store for a value associated with the vertex `key` identifies. /// Query the store for a value associated with the vertex `key` identifies.
/// ///
/// Using a [`Key`] is more efficient than using an alias. /// Using a [`Key`] is more efficient than using an alias.
pub fn lookup<V>(&self, key: impl Keylike) -> Result<(Key, V)> pub fn lookup<M>(&self, key: impl Keylike) -> Result<(Key, M)>
where where
V: Value, M: Mixin,
{ {
// Checked translate isn't needed, we'll complain if we can't find the data.
let canonicalized_key = key.translate(&self)?; let canonicalized_key = key.translate(&self)?;
let raw = self.with(V::SPACE).get(canonicalized_key)?; let raw = self.with(M::SPACE).get(canonicalized_key)?;
let value = decode(raw.as_ref())?; let value = decode(raw.as_ref())?;
Ok((canonicalized_key, value)) Ok((canonicalized_key, value))
} }
/// Associate a new value with the key. /// Associate a new mixin value with the key.
/// ///
/// Returns `Error::Conflict` if a value of this type is already associated with the /// **Errors**
/// (canonicalized) key. ///
pub fn insert<V>(&self, key: impl Keylike, data: V) -> Result<()> /// - `Error::Conflict` if a mixin of this type is already associated with the vertex
/// - `Error::Undefined` if `key` is not in the registry.
pub fn insert<M>(&self, key: impl Keylike, data: M) -> Result<()>
where where
V: Value, M: Mixin,
{ {
let key = key.translate(&self)?; let key = key.checked_translate(&self)?;
let data = encode(data)?; let data = encode(data)?;
let ns = self.with(V::SPACE); let ns = self.with(M::SPACE);
// Check for conflicts. Fail if the key already exists, otherwise set the key // Check for conflicts. Fail if the key already exists, otherwise set the key
// to the given value. // to the given value.
if ns.has(key)? { if ns.has(key)? {
@ -80,28 +113,55 @@ impl Transaction<'_> {
} }
} }
/// Apply an update function to the value identified by the key. /// Apply an update function to the value identified by the key.
pub fn update<V>(&self, key: impl Keylike, f: impl FnOnce(Key, V) -> Result<V>) -> Result<()> ///
/// **Errors**
///
/// - `Error::Undefined` if the `key` is not registered
/// - `Error::Missing` if `key` does not exist in the keyspace associated with `M`
pub fn update<M>(&self, key: impl Keylike, f: impl FnOnce(Key, M) -> Result<M>) -> Result<()>
where where
V: Value, M: Mixin,
{ {
let (key, old) = self.lookup::<V>(key)?; let key = key.checked_translate(self)?;
let (key, old) = self.lookup::<M>(key)?;
let new = f(key, old).and_then(encode)?; let new = f(key, old).and_then(encode)?;
self.with(V::SPACE).set(key, new) self.with(M::SPACE).set(key, new)
} }
/// Remove a value from the database. Doesn't complain if the value does not exist. /// Remove the mixin from the vertex `key` refers to.
pub fn remove<V>(&self, key: impl Keylike) -> Result<Option<V>> ///
/// Doesn't complain if the value does not exist in the expected keyspace.
pub fn remove<M>(&self, key: impl Keylike) -> Result<Option<M>>
where where
V: Value, M: Mixin,
{ {
let canonical_key = key.translate(&self)?; // Checked translate isn't needed because we don't care if the key is bogus.
let ns = self.with(V::SPACE); let canonical_key = key.translate(self)?;
let ns = self.with(M::SPACE);
match ns.pop(canonical_key) { match ns.pop(canonical_key) {
Ok(Some(val)) => decode(&val).map(Some), Ok(Some(val)) => decode(&val).map(Some),
Ok(None) => Ok(None), Ok(None) => Ok(None),
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
/// List all key-value pairs for mixins of type `M`.
pub fn list<M>(&self) -> impl Iterator<Item = Result<(Key, M)>> + '_
where
M: Mixin,
{
self.with(M::SPACE).list().map(|r| {
let (k, v) = r?;
let v = decode(v.as_ref())?;
let k = Key::from_slice(k.as_ref());
Ok((k, v))
})
}
}
/// Methods for interacting with [aliases][Alias], which are unique alternate keys.
impl Transaction<'_> {
/// Look up the key that the given alias maps to. /// Look up the key that the given alias maps to.
///
/// If the key was deleted, but the alias wasn't properly cleaned up,
pub fn lookup_alias<A>(&self, alias: A) -> Result<Key> pub fn lookup_alias<A>(&self, alias: A) -> Result<Key>
where where
A: Alias, A: Alias,
@ -110,6 +170,14 @@ impl Transaction<'_> {
let raw = self.with(l).get(alias.to_string())?; let raw = self.with(l).get(alias.to_string())?;
Ok(Key::from_slice(raw.as_ref())) Ok(Key::from_slice(raw.as_ref()))
} }
/// Given a key, figure out what the value of the alias is.
pub fn lookup_alias_rev<A>(&self, key: Key) -> Result<A>
where
A: Alias,
{
let raw = self.with(A::SPACE.1).get(key)?;
Ok(A::from(String::from_utf8_lossy(raw.as_ref()).into_owned()))
}
/// Create a new alias of type `A` for the given [`Key`]. /// Create a new alias of type `A` for the given [`Key`].
/// ///
/// If the alias already exists, this function returns `Conflict`. /// If the alias already exists, this function returns `Conflict`.
@ -140,6 +208,9 @@ impl Transaction<'_> {
} }
OK OK
} }
}
impl Transaction<'_> {
/// Find an arrow of type `A` with the given `tail` and `head`. /// Find an arrow of type `A` with the given `tail` and `head`.
pub fn lookup_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<Option<A>> pub fn lookup_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<Option<A>>
where where
@ -147,16 +218,26 @@ impl Transaction<'_> {
{ {
let (l, _) = A::SPACE; let (l, _) = A::SPACE;
match self.with(l).get(tail.fuse(head)) { match self.with(l).get(tail.fuse(head)) {
Ok(raw) => decode(raw.as_ref()), Ok(raw) => decode(raw.as_ref()).map(Some), // BUG: This is broken for unit structs apparently.
Err(Error::Missing) => Ok(None), Err(Error::Missing) => Ok(None),
Err(err) => Err(err), Err(err) => Err(err),
} }
} }
/// Create a new arrow of type `A` and associate the label with it. /// Create a new arrow of type `A` and associate the label with it.
///
/// **Errors**
///
/// - `Error::Undefined` if either key is not registered
pub fn insert_arrow<A>(&self, (tail, head): (Key, Key), label: A) -> Result<()> pub fn insert_arrow<A>(&self, (tail, head): (Key, Key), label: A) -> Result<()>
where where
A: Arrow, A: Arrow,
{ {
if !self.exists(tail)? {
return Err(Error::Undefined { key: tail });
}
if !self.exists(head)? {
return Err(Error::Undefined { key: head });
}
let (l, r) = A::SPACE; let (l, r) = A::SPACE;
let label = encode(label)?; let label = encode(label)?;
self.with(l).set(tail.fuse(head), &label)?; self.with(l).set(tail.fuse(head), &label)?;
@ -172,8 +253,40 @@ impl Transaction<'_> {
self.with(A::SPACE.1).del(head.fuse(tail))?; self.with(A::SPACE.1).del(head.fuse(tail))?;
OK OK
} }
/// Check whether an arrow exists.
pub fn arrow_exists<A>(&self, (tail, head): (Key, Key)) -> Result<bool>
where
A: Arrow,
{
self.with(A::SPACE.0).has(tail.fuse(head))
}
/// Get all arrows of type `A` "pointing at" `key`.
pub fn list_incoming<A>(&self, key: impl Keylike) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
self.list_arrows_where(Direction::Incoming, key)
}
/// Get all arrows of type `A` "pointing away from" `key`.
pub fn list_outgoing<A>(&self, key: impl Keylike) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
self.list_arrows_where(Direction::Outgoing, key)
}
/// Get all arrows of type `A`.
pub fn list_arrows<A>(&self) -> impl Iterator<Item = Result<(Key, A, Key)>> + '_
where
A: Arrow,
{
self.with(A::SPACE.0).list().map(|r| {
let (k, v) = r?;
let (tail, head) = Key::split(k.as_ref());
decode(v.as_ref()).map(|label| (tail, label, head))
})
}
/// Select arrows with the given direction relative to the given key. /// Select arrows with the given direction relative to the given key.
pub fn list_arrows_with<A>( fn list_arrows_where<A>(
&self, &self,
direction: Direction, direction: Direction,
key: impl Keylike, key: impl Keylike,
@ -189,16 +302,23 @@ impl Transaction<'_> {
Direction::Incoming => A::SPACE.1, Direction::Incoming => A::SPACE.1,
}; };
let key = key.translate(&self).unwrap(); let key = key.translate(&self).unwrap();
#[cfg(test)]
eprintln!("scanning {} using prefix {key}", space.0);
self.with(space).scan(key).map(|r| { self.with(space).scan(key).map(|r| {
let (k, v) = r?; let (k, v) = r?;
// Because we're prefix scanning on the first half of the key, we only want to // Because we're prefix scanning on the first half of the key, we only want to
// get the second here. // get the second here.
let (_, other) = Key::split(k.as_ref()); let (this, other) = Key::split(k.as_ref());
#[cfg(test)]
eprintln!(" found {this}:{other}");
decode(v.as_ref()).map(|label| (other, label)) decode(v.as_ref()).map(|label| (other, label))
}) })
} }
}
impl Transaction<'_> {
/// Use a keyspace. /// Use a keyspace.
pub(crate) fn with(&self, name: impl AsRef<str>) -> Keyspace<'_> { fn with(&self, name: impl AsRef<str>) -> Keyspace<'_> {
Keyspace { Keyspace {
cf: self.spaces[name.as_ref()].clone(), cf: self.spaces[name.as_ref()].clone(),
tx: &self, tx: &self,
@ -207,7 +327,7 @@ impl Transaction<'_> {
} }
/// Provides the basic API for a keyspace/column family. /// Provides the basic API for a keyspace/column family.
pub(crate) struct Keyspace<'db> { struct Keyspace<'db> {
tx: &'db Transaction<'db>, tx: &'db Transaction<'db>,
cf: Arc<BoundColumnFamily<'db>>, cf: Arc<BoundColumnFamily<'db>>,
} }
@ -250,12 +370,29 @@ impl<'db> Keyspace<'db> {
/// Execute a prefix scan. /// Execute a prefix scan.
pub fn scan( pub fn scan(
&self, &self,
prefix: impl AsRef<[u8]>, prefix: impl AsRef<[u8]> + 'db,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
let t = prefix.as_ref().to_vec();
self.tx
.inner
.prefix_iterator_cf(&self.cf, prefix.as_ref())
// The prefix iterator may "overshoot". This makes it stop when it reaches
// the end of the range that has the prefix.
.take_while(move |r| match r {
Ok((ref k, _)) => k.starts_with(&t),
_ => true,
})
.map(|r| r.map_err(Error::Internal))
}
/// Show all items in the entire keyspace.
pub fn list(
&self,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db ) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{ {
self.tx self.tx
.inner .inner
.prefix_iterator_cf(&self.cf, prefix) .full_iterator_cf(&self.cf, IteratorMode::Start)
.map(|r| r.map_err(Error::Internal)) .map(|r| r.map_err(Error::Internal))
} }
} }
@ -272,3 +409,288 @@ where
.map_err(Error::Decoding) .map_err(Error::Decoding)
.map(|(v, _)| v) .map(|(v, _)| v)
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::Space;
#[derive(Encode, Decode)]
struct TestArrow;
impl Arrow for TestArrow {
const SPACE: (Space, Space) = (Space("test-arrow/l"), Space("test-arrow/r"));
}
const TEST_TAG: Tag = Tag(69);
macro_rules! keygen {
{ $($name:ident)* } => {
$(
let $name = Key::gen();
eprintln!(concat!(stringify!($name), "={}"), $name);
)*
}
}
fn with_test_arrow(f: impl Fn(Key, Key, &Transaction<'_>, usize) -> Result<()>) -> Result<()> {
Store::with_tmp(|db| {
// Run these tests 128 times because misuse of prefix iterator may cause weird,
// obscure bugs :3
//
// Also, because we don't wipe the store between test runs, we have more chances
// to discover weird bugs that we wouldn't catch if there was only a single run.
Ok(for n in 0..128 {
eprintln!("--- run {n} ---");
db.transaction(|tx| {
keygen!(target origin);
tx.define(target, TEST_TAG)?;
tx.define(origin, TEST_TAG)?;
tx.insert_arrow((origin, target), TestArrow)?;
let l: Vec<String> = tx
.with("test-arrow/l")
.list()
.map(|r| r.map(|(k, _)| Key::split(k.as_ref())))
.map(|r| r.map(|(a, b)| format!("({a}, {b})")))
.collect::<Result<_>>()?;
eprintln!("test-arrow/l = {l:#?}");
let r: Vec<String> = tx
.with("test-arrow/r")
.list()
.map(|r| r.map(|(k, _)| Key::split(k.as_ref())))
.map(|r| r.map(|(a, b)| format!("({a}, {b})")))
.collect::<Result<_>>()?;
eprintln!("test-arrow/r = {r:#?}");
f(origin, target, &tx, n)
})?;
eprintln!("--- end run {n} ---");
})
})
}
#[test]
fn target_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let ti = tx
.list_incoming::<TestArrow>(target)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
eprintln!("target.incoming = {ti:#?}");
assert!(ti.contains(&origin), "origin ∈ target.incoming");
assert!(!ti.contains(&target), "target ∉ target.incoming");
OK
})
}
#[test]
fn target_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let to = tx
.list_outgoing::<TestArrow>(target)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
eprintln!("target.outgoing = {to:#?}");
assert!(!to.contains(&target), "target ∉ target.outgoing");
assert!(!to.contains(&origin), "origin ∉ target.outgoing");
OK
})
}
#[test]
fn origin_incoming() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oi = tx
.list_incoming::<TestArrow>(origin)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
eprintln!("origin.incoming = {oi:#?}");
assert!(!oi.contains(&origin), "origin ∉ origin.incoming");
assert!(!oi.contains(&target), "target ∉ origin.incoming");
OK
})
}
#[test]
fn origin_outgoing() -> Result<()> {
with_test_arrow(|origin, target, tx, _| {
let oo = tx
.list_outgoing::<TestArrow>(origin)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
eprintln!("origin.outgoing = {oo:#?}");
assert!(oo.contains(&target), "target ∈ origin.outgoing");
assert!(!oo.contains(&origin), "origin ∉ origin.outgoing");
OK
})
}
#[test]
fn fanout() -> Result<()> {
let targets: [Key; 128] = std::array::from_fn(|_| Key::gen());
let origin = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.define(origin, TEST_TAG)?;
for t in targets {
tx.define(t, TEST_TAG)?;
tx.insert_arrow((origin, t), TestArrow)?;
}
// TODO: we keep doing this, we should make a function or something for it
let oo = tx
.list_outgoing::<TestArrow>(origin)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
for t in targets {
assert!(oo.contains(&t), "∀ t ∈ targets: t ∈ origin.outgoing");
let ti = tx
.list_incoming::<TestArrow>(t)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
assert!(
ti == vec! {origin},
"∀ t ∈ targets: t.incoming = {{origin}}"
);
}
OK
})
})
}
#[test]
fn fanin() -> Result<()> {
let origins: [Key; 128] = std::array::from_fn(|_| Key::gen());
let target = Key::gen();
Store::with_tmp(|db| {
db.transaction(|tx| {
tx.define(target, TEST_TAG)?;
for o in origins {
tx.define(o, TEST_TAG)?;
tx.insert_arrow((o, target), TestArrow)?;
}
let ti = tx
.list_incoming::<TestArrow>(target)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
for o in origins {
let oo = tx
.list_outgoing::<TestArrow>(o)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()?;
assert!(ti.contains(&o), "∀ o ∈ origins: o ∈ target.incoming");
assert!(
oo == vec! {target},
"∀ o ∈ origins: o.outgoing = {{target}}"
);
}
OK
})
})
}
#[test]
fn distinct_many_to_many() -> Result<()> {
let origins: [Key; 32] = std::array::from_fn(|_| Key::gen());
let targets: [Key; 32] = std::array::from_fn(|_| Key::gen());
Store::with_tmp(|db| {
db.transaction(|tx| {
for t in targets {
tx.define(t, TEST_TAG)?;
}
for o in origins {
tx.define(o, TEST_TAG)?;
for t in targets {
tx.insert_arrow((o, t), TestArrow)?;
}
}
let ti: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_incoming::<TestArrow>(t)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()
.map(|v| (t, v))
})
.collect::<Result<_>>()?;
// For each origin point, there must be a target that has it as "incoming".
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().any(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∃ t ∈ targets: o ∈ t.incoming"
);
// Each target has each origin as incoming.
assert!(
origins
.into_iter()
.all(|o| { targets.into_iter().all(|t| { ti[&t].contains(&o) }) }),
"∀ o ∈ origins: ∀ t ∈ targets: o ∈ t.incoming"
);
let to: HashMap<Key, Vec<Key>> = targets
.into_iter()
.map(|t| {
tx.list_outgoing::<TestArrow>(t)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()
.map(|v| (t, v))
})
.collect::<Result<_>>()?;
// Our arrows point only from origins to targets, and there's a bug if there
// exists a target such that its outgoing set is non-empty.
assert!(
!targets.into_iter().any(|t| !to[&t].is_empty()),
"∄ t ∈ targets: t.outgoing ≠ ∅"
);
let oo: HashMap<Key, Vec<Key>> = origins
.into_iter()
.map(|o| {
tx.list_outgoing::<TestArrow>(o)
.map(|r| r.map(|(k, _)| k))
.collect::<Result<Vec<_>>>()
.map(|v| (o, v))
})
.collect::<Result<_>>()?;
// Each origin has each target as outgoing.
assert!(
origins
.into_iter()
.all(|o| targets.into_iter().all(|t| oo[&o].contains(&t))),
"∀ o ∈ origins: ∀ t ∈ targets: t ∈ o.outgoing"
);
OK
})
})
}
}