Initial implementation of basic data store

This commit is contained in:
Riley Apeldoorn 2024-04-19 00:12:21 +02:00
parent b5aacb5896
commit 7c589922e6
7 changed files with 2455 additions and 96 deletions

2018
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1710765496,
"narHash": "sha256-p7ryWEeQfMwTB6E0wIUd5V2cFTgq+DRRBz2hYGnJZyA=",
"lastModified": 1713449828,
"narHash": "sha256-TArZx2a6bd0/H8b3V7DB/3/ewOYRBD9i8QvsrselSuc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e367f7a1fb93137af22a3908f00b9a35e2d286a7",
"rev": "0a59cfb77c09f6dd449bd25c2882ad8bb1e203eb",
"type": "github"
},
"original": {
@ -55,11 +55,11 @@
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1710727870,
"narHash": "sha256-Ulsx+t4SnRmjMJx4eF2Li+3rBGYhZp0XNShVjIheCfg=",
"lastModified": 1713406362,
"narHash": "sha256-85f70DM03RWqzahxXChSWcbnUYAKYBDBvyQ4P+kXVBk=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "a1b17cacfa7a6ed18f553a195a047f4e73e95da9",
"rev": "05a4f1b28ee59f589c8a0ad877741de3c3bf894d",
"type": "github"
},
"original": {

View file

@ -8,14 +8,23 @@
pkgs = import nixpkgs { system = "x86_64-linux"; overlays = [(import oxalica)]; };
in{
devShell."x86_64-linux" = pkgs.mkShell {
buildInputs = with pkgs; [
(rust-bin.selectLatestNightlyWith (toolchain: toolchain.default.override {
extensions = [ "rust-src" "rustfmt" "rust-analyzer" ];
}))
buildInputs = with pkgs;
let rust = (rust-bin.selectLatestNightlyWith (toolchain: toolchain.default.override {
extensions = [ "rust-src" "rustfmt" "rust-analyzer" ];
}));
in [
rust
openssl
pkg-config
evcxr
llvmPackages_16.clang
llvmPackages_16.libclang
];
shellHook = ''
export LIBCLANG_PATH="${pkgs.llvmPackages_16.libclang.lib}/lib";
export ROCKSDB_LIB_DIR="${pkgs.rocksdb}/lib";
'';
};
};
}

View file

@ -7,3 +7,6 @@ path = "src/lib.rs"
[dependencies]
ulid = "*"
rocksdb = "*"
derive_more = "*"
bincode = "2.0.0-rc.3"

View file

@ -1,27 +1,56 @@
pub const NODE: usize = 16;
pub const EDGE: usize = NODE * 2;
use crate::{Alias, Result, Transaction};
#[derive(Clone, Copy)]
pub struct Key<const SIZE: usize = NODE>([u8; SIZE]);
impl<const S: usize> AsRef<[u8]> for Key<S> {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
/// A unique identifier for vertices in the database.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
pub struct Key([u8; 16]);
impl Key {
pub fn join(self, other: Key) -> Key<EDGE> {
unsafe { std::mem::transmute((self, other)) }
pub(crate) fn from_slice(buf: &[u8]) -> Key {
let mut key = [0; 16];
key.copy_from_slice(&buf);
Key(key)
}
/// Join two keys together.
pub(crate) fn fuse(self, other: Key) -> [u8; 32] {
let mut buf = [0; 32];
buf[..16].copy_from_slice(&self.0);
buf[16..].copy_from_slice(&other.0);
buf
}
pub(crate) fn split(buf: &[u8]) -> (Key, Key) {
let tail = Key::from_slice(&buf[..16]);
let head = Key::from_slice(&buf[16..]);
(tail, head)
}
}
impl Key<EDGE> {
pub fn split(self) -> (Key, Key) {
unsafe { std::mem::transmute_copy(&self.0) }
}
pub fn swap(self) -> Key<EDGE> {
let (l, r) = self.split();
r.join(l)
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
/// Anything that can be used to reference a vertex, both "normal" [keys](Key)
/// and [aliases](Alias).
///
/// In general, using a key directly is going to be more efficient than using
/// an alias, because it incurs less lookups.
pub trait Keylike {
/// Translate the thing to a [`Key`].
fn translate(self, tx: &Transaction<'_>) -> Result<Key>;
}
impl Keylike for Key {
fn translate(self, _: &Transaction<'_>) -> Result<Key> {
Ok(self)
}
}
impl<A> Keylike for A
where
A: Alias,
{
fn translate(self, tx: &Transaction<'_>) -> Result<Key> {
tx.lookup_alias(self)
}
}

View file

@ -1,24 +1,114 @@
pub struct Store {}
//! The data store abstractions used by the ActivityPuppy project.
//!
//! Persistence in a puppy server is handled by this component, which implements a directed graph
//! inspired datastore on top of the [rocksdb] key-value store.
//!
//! The workflow for manipulating stuff in the store is to open a [`Store`], and then to call
//! its [`transaction`](Store::transaction) method. This method takes a function that, given
//! a [`Transaction`], returns a result with some value. The `Transaction` object contains some
//! useful CRUD methods. Returning an `Ok` commits the transaction and returning `Err` rolls it
//! back.
pub use key::Key;
use std::sync::Arc;
use derive_more::From;
use rocksdb::MultiThreaded;
type Backend = rocksdb::TransactionDB<MultiThreaded>;
mod key;
mod transaction;
pub mod transaction;
pub use key::{Key, Keylike};
pub use transaction::Transaction;
pub use {alias::Alias, arrow::Arrow, value::Value};
/// An (optionally labeled) directed edge between two keys.
pub trait Arrow {
/// Additional data associated with this edge.
type Label;
/// The handle to the data store.
///
/// This type can be cloned freely.
#[derive(Clone)]
pub struct Store {
inner: Arc<Backend>,
}
/// A simple value associated with a key.
pub trait Value {}
/// An isolated keyspace.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Space(&'static str);
/// Aliases are unique mappings/indices that can be used in place of a key.
///
/// They typically take the form of a newtype to add semantic meaning.
pub trait Alias {}
impl AsRef<str> for Space {
fn as_ref(&self) -> &str {
&self.0
}
}
pub mod value {
//! Modules of information.
use bincode::{Decode, Encode};
use crate::Space;
/// A simple piece of data associated with a vertex.
pub trait Value: Encode + Decode {
const SPACE: Space;
}
#[derive(Encode, Decode)]
pub struct Profile {}
impl Value for Profile {
const SPACE: Space = Space("profile");
}
}
pub mod arrow {
//! Relations between nodes.
use bincode::{Decode, Encode};
use crate::Space;
/// A directed edge between two vertices.
pub trait Arrow: Encode + Decode {
const SPACE: (Space, Space);
}
/// Which way an arrow is pointing when viewed from a particular vertex.
pub enum Direction {
Incoming,
Outgoing,
}
}
pub mod alias {
//! Alternative keys.
use derive_more::{Display, From};
use crate::Space;
/// An alternative unique key for a vertex.
pub trait Alias: ToString + From<String> {
const SPACE: (Space, Space);
}
#[derive(Display, From)]
pub struct Username(pub String);
impl Alias for Username {
const SPACE: (Space, Space) = (Space("username/l"), Space("username/r"));
}
}
/// Results from this component.
type Result<T, E = ()> = std::result::Result<T, E>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors from the data store.
#[derive(From, Debug)]
pub enum Error {
Missing,
Conflict,
Internal(rocksdb::Error),
Encoding(bincode::error::EncodeError),
Decoding(bincode::error::DecodeError),
}

View file

@ -1,69 +1,279 @@
use crate::{Alias, Arrow, Key, Result, Value};
use std::{collections::HashMap, sync::Arc};
pub struct Transaction;
use bincode::{Decode, Encode};
use rocksdb::BoundColumnFamily;
/// Operations on [values][Value].
impl Transaction {
pub fn lookup_value<V>(&self, key: Key) -> Result<(Key, V)>
where
V: Value,
{
todo!()
}
pub fn insert_value<V>(&self, key: Key) -> Result<()>
where
V: Value,
{
todo!()
}
pub fn update_value<V>(&self, key: Key) -> Result<()>
where
V: Value,
{
todo!()
}
pub fn delete_value<V>(&self, key: Key) -> Result<()>
where
V: Value,
{
todo!()
use crate::{arrow::Direction, Alias, Arrow, Backend, Error, Key, Keylike, Result, Store, Value};
const OK: Result<()> = Ok(());
/// Master list of all column family names in use.
const SPACES: &[&'static str] = &[
"registry",
"username/l",
"username/r",
"follows/l",
"follows/r",
"profile",
];
impl Store {
/// Initiate a transaction.
///
/// If the result is an error, the transaction is rolled back, and otherwise the transaction
/// is committed.
pub fn transaction<T>(&self, f: impl FnOnce(&Transaction<'_>) -> Result<T>) -> Result<T> {
// Load all the column family handles, because they can't be accessed through the
// `rocksdb::Transaction` struct, only the `TransactionDB`.
let spaces = SPACES
.into_iter()
.map(|name| (*name, self.inner.cf_handle(name).unwrap()))
.collect();
let tx = Transaction {
inner: self.inner.transaction(),
spaces,
};
let result = f(&tx);
if result.is_err() {
tx.inner.rollback()?;
} else {
tx.inner.commit()?;
}
result
}
}
/// Operations on [arrows][Arrow].
impl Transaction {
pub fn create_arrow<A>(&self, key: (Key, Key), label: A::Label) -> Result<()>
where
A: Arrow,
{
todo!()
}
pub fn delete_arrow<A>(&self, key: (Key, Key)) -> Result<()>
where
A: Arrow,
{
todo!()
}
pub fn arrow_exists<A>(&self, key: (Key, Key)) -> Result<bool>
where
A: Arrow,
{
todo!()
}
/// A database transaction, in which either each action succeeds, or everything fails
/// together.
///
/// The transaction struct is the interface for quering and manipulating persisted content.
pub struct Transaction<'db> {
inner: rocksdb::Transaction<'db, Backend>,
spaces: HashMap<&'static str, Arc<BoundColumnFamily<'db>>>,
}
/// Operations on [aliases][Alias].
impl Transaction {
pub fn create_alias<A>(&self, key: Key, alias: A) -> Result<()>
impl Transaction<'_> {
/// Register a new key.
pub fn define(&self, key: Key) -> Result<()> {
self.with("registry").set(key, b"")?;
OK
}
/// Query the store for a value associated with the vertex `key` identifies.
///
/// Using a [`Key`] is more efficient than using an alias.
pub fn lookup<V>(&self, key: impl Keylike) -> Result<(Key, V)>
where
V: Value,
{
let canonicalized_key = key.translate(&self)?;
let raw = self.with(V::SPACE).get(canonicalized_key)?;
let value = decode(raw.as_ref())?;
Ok((canonicalized_key, value))
}
/// Associate a new value with the key.
///
/// Returns `Error::Conflict` if a value of this type is already associated with the
/// (canonicalized) key.
pub fn insert<V>(&self, key: impl Keylike, data: V) -> Result<()>
where
V: Value,
{
let key = key.translate(&self)?;
let data = encode(data)?;
let ns = self.with(V::SPACE);
// Check for conflicts. Fail if the key already exists, otherwise set the key
// to the given value.
if ns.has(key)? {
Err(Error::Conflict)
} else {
ns.set(key, data)
}
}
/// Apply an update function to the value identified by the key.
pub fn update<V>(&self, key: impl Keylike, f: impl FnOnce(Key, V) -> Result<V>) -> Result<()>
where
V: Value,
{
let (key, old) = self.lookup::<V>(key)?;
let new = f(key, old).and_then(encode)?;
self.with(V::SPACE).set(key, new)
}
/// Remove a value from the database. Doesn't complain if the value does not exist.
pub fn remove<V>(&self, key: impl Keylike) -> Result<Option<V>>
where
V: Value,
{
let canonical_key = key.translate(&self)?;
let ns = self.with(V::SPACE);
match ns.pop(canonical_key) {
Ok(Some(val)) => decode(&val).map(Some),
Ok(None) => Ok(None),
Err(err) => Err(err),
}
}
/// Look up the key that the given alias maps to.
pub fn lookup_alias<A>(&self, alias: A) -> Result<Key>
where
A: Alias,
{
todo!()
let (l, _) = A::SPACE;
let raw = self.with(l).get(alias.to_string())?;
Ok(Key::from_slice(raw.as_ref()))
}
pub fn delete_alias<A>(&self, alias: A)
/// Create a new alias of type `A` for the given [`Key`].
pub fn insert_alias<A>(&self, key: Key, alias: A) -> Result<()>
where
A: Alias,
{
todo!()
let (l, r) = A::SPACE;
let alias = alias.to_string();
self.with(l).set(&alias, key)?;
self.with(r).set(key, &alias)?;
OK
}
/// Delete the alias of type `A` that points to `key`.
pub fn remove_alias<A>(&self, key: Key) -> Result<()>
where
A: Alias,
{
let (l, r) = A::SPACE;
// First, pop the reverse mapping, which will give us the encoded
// key for the normal mapping. If it doesn't exist, don't delete
// the normal mapping.
if let Some(alias) = self.with(r).pop(key)? {
self.with(l).pop(alias)?;
}
OK
}
/// Find an arrow of type `A` with the given `tail` and `head`.
pub fn lookup_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<Option<A>>
where
A: Arrow,
{
let (l, _) = A::SPACE;
match self.with(l).get(tail.fuse(head)) {
Ok(raw) => decode(raw.as_ref()),
Err(Error::Missing) => Ok(None),
Err(err) => Err(err),
}
}
/// Create a new arrow of type `A` and associate the label with it.
pub fn insert_arrow<A>(&self, (tail, head): (Key, Key), label: A) -> Result<()>
where
A: Arrow,
{
let (l, r) = A::SPACE;
let label = encode(label)?;
self.with(l).set(tail.fuse(head), &label)?;
self.with(r).set(head.fuse(tail), &label)?;
OK
}
/// Delete an arrow from the data store.
pub fn remove_arrow<A>(&self, (tail, head): (Key, Key)) -> Result<()>
where
A: Arrow,
{
self.with(A::SPACE.0).del(tail.fuse(head))?;
self.with(A::SPACE.1).del(head.fuse(tail))?;
OK
}
/// Select arrows with the given direction relative to the given key.
pub fn list_arrows_with<A>(
&self,
direction: Direction,
key: impl Keylike,
) -> impl Iterator<Item = Result<(Key, A)>> + '_
where
A: Arrow,
{
// Keys in space 0 are arranged with the tail at the start, and the ones in space 1
// are arranged with the head at the start. This allows us to efficiently prefix scan
// regardless of the direction, at the cost of increased space usage.
let space = match direction {
Direction::Outgoing => A::SPACE.0,
Direction::Incoming => A::SPACE.1,
};
let key = key.translate(&self).unwrap();
self.with(space).scan(key).map(|r| {
let (k, v) = r?;
// Because we're prefix scanning on the first half of the key, we only want to
// get the second here.
let (_, other) = Key::split(k.as_ref());
decode(v.as_ref()).map(|label| (other, label))
})
}
/// Use a keyspace.
pub(crate) fn with(&self, name: impl AsRef<str>) -> Keyspace<'_> {
Keyspace {
cf: self.spaces[name.as_ref()].clone(),
tx: &self,
}
}
}
/// Provides the basic API for a keyspace/column family.
pub(crate) struct Keyspace<'db> {
tx: &'db Transaction<'db>,
cf: Arc<BoundColumnFamily<'db>>,
}
impl<'db> Keyspace<'db> {
/// Retrieve a value from the database. Returns `Missing` if the key does not exist.
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<impl AsRef<[u8]> + 'db> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.and_then(|opt| opt.ok_or(Error::Missing))
}
/// Set the value at `key` to `val`.
pub fn set(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
self.tx
.inner
.put_cf(&self.cf, key, val)
.map_err(Error::Internal)
}
/// Delete the key-value pair identified by `key`.
pub fn del(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.tx.inner.delete_cf(&self.cf, &key)?;
OK
}
/// Remove the key and associated value from the keyspace, and return its previous value.
pub fn pop(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
let old = self.tx.inner.get_for_update_cf(&self.cf, &key, true)?;
self.del(key)?;
Ok(old)
}
/// Check whether the key exists in the keyspace.
pub fn has(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.tx
.inner
.get_pinned_cf(&self.cf, key)
.map_err(Error::Internal)
.map(|opt| opt.is_some())
}
/// Execute a prefix scan.
pub fn scan(
&self,
prefix: impl AsRef<[u8]>,
) -> impl Iterator<Item = Result<(impl AsRef<[u8]> + 'static, impl AsRef<[u8]> + 'static)>> + 'db
{
self.tx
.inner
.prefix_iterator_cf(&self.cf, prefix)
.map(|r| r.map_err(Error::Internal))
}
}
fn encode(data: impl Encode) -> Result<Vec<u8>> {
bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding)
}
fn decode<T>(data: &[u8]) -> Result<T>
where
T: Decode,
{
bincode::decode_from_slice(data, bincode::config::standard())
.map_err(Error::Decoding)
.map(|(v, _)| v)
}