use std::ops::RangeBounds; use bincode::{Decode, Encode}; use chrono::{DateTime, Utc}; use super::{ types::{DataType, MixinSpec}, Batch, Store, Transaction, }; use crate::{util::IterExt as _, Error, Key, Result}; /// Mixins are the simplest pieces of data in the store. pub trait Mixin: DataType + Encode + Decode {} /// Derive a [`Mixin`] implementation. /// /// In addition to deriving `Mixin`, you will need to derive or implement [`Encode`] /// and [`Decode`]. pub use r#macro::Mixin; impl Store { /// Get the value! pub fn get_mixin(&self, node: Key) -> Result> where M: Mixin, { op::get_mixin(self, node) } /// Check if `node` has a mixin `M`. pub fn has_mixin(&self, node: Key) -> Result where M: Mixin, { op::has_mixin::(self, node) } /// Get all `M`s where the key's timestamp is within the `range`. pub fn range( &self, range: impl RangeBounds>, ) -> impl Iterator> + '_ where M: Mixin, { op::get_range(self, range) } /// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results. pub fn join_on( &self, iter: impl IntoIterator>, ) -> Result)>> where M: Mixin, { op::join_on(self, iter) } } impl Transaction<'_> { /// Apply an update function to the mixin `M` of `node`. /// /// # Errors /// /// - [`Error::Missing`]: if `node` does not have a mixin of this type. /// /// [`Error::Missing`]: crate::Error::Missing pub fn update(&self, node: Key, update: impl FnOnce(M) -> M) -> Result<()> where M: Mixin, { op::update(self, node, update) } /// Get the mixin of the specified type associated with `node`. pub fn get_mixin(&self, node: Key) -> Result> where M: Mixin, { op::get_mixin(self, node) } /// Add a mixin to `node`. /// /// # Errors /// /// - [`Error::Conflict`]: if `node` already has a mixin of type `M`. /// /// [`Error::Conflict`]: crate::Error::Missing pub fn add_mixin(&self, node: Key, mixin: M) -> Result<()> where M: Mixin, { if op::has_mixin::(self, node)? { return Err(Error::Conflict); } else { op::add_mixin::(self, node, mixin) } } /// Check whether `node` has an `M` defined for it. pub fn has_mixin(&self, node: Key) -> Result where M: Mixin, { op::has_mixin::(self, node) } /// Get all `M`s where the key's timestamp is within the `range`. pub fn range( &self, range: impl RangeBounds>, ) -> impl Iterator> + '_ where M: Mixin, { op::get_range(self, range) } /// Think "LEFT JOIN". In goes an iterator over keys, out come all the associated results. pub fn join_on( &self, f: impl Fn(T) -> Key, iter: impl IntoIterator>, ) -> Result)>> where M: Mixin, { op::join_on(self, iter.into_iter().map_ok(f)) } } impl Batch { /// Add a mixin to the `node`. /// /// **Note**: unlike [`Transaction::add_mixin`], this will *not* return an error if the key already has a mixin /// of this type. This *should* not cause inconsistency. pub fn put_mixin(&mut self, node: Key, mixin: M) where M: Mixin, { op::add_mixin(self, node, mixin).unwrap() } } mod op { use std::ops::{Bound, RangeBounds}; use chrono::{DateTime, TimeDelta, Utc}; use either::Either; use super::Mixin; use crate::{internal::*, util::IterExt as _, Error, Key, Result}; pub fn update( cx: &(impl Query + Write), node: Key, update: impl FnOnce(M) -> M, ) -> Result<()> where M: Mixin, { // TODO: implement in terms of a merge operator instead of separate query and write ops. // this would let us remove the `Query` bound, which would in turn let us update from within // a batch. // // See https://github.com/facebook/rocksdb/wiki/Merge-Operator // // It looks like rocksdb allows you to specify a merge operator per column family.[^1] // This means we can construct our column families with a merge operator that knows how to encode and decode mixins. // // [^1]: https://github.com/facebook/rocksdb/blob/9d37408f9af15c7a1ae42f9b94d06b27d98a011a/include/rocksdb/options.h#L128 let tree = cx.open(M::SPEC.keyspace); match tree.get(node.as_ref())? { None => Err(Error::Missing), Some(buf) => { let new = decode(buf).map(update).and_then(encode)?; tree.set(node, new) } } } pub fn get_mixin(cx: &impl Query, node: Key) -> Result> { cx.open(M::SPEC.keyspace).get(node)?.map(decode).transpose() } pub fn add_mixin(cx: &impl Write, node: Key, mixin: M) -> Result<()> { cx.open(M::SPEC.keyspace).set(node, encode(mixin)?) } pub fn has_mixin(cx: &impl Query, node: Key) -> Result { cx.open(M::SPEC.keyspace).has(node) } pub fn get_range( cx: &impl Query, range: impl RangeBounds>, ) -> impl Iterator> + '_ { // TODO: Test this thoroughly const MS: TimeDelta = TimeDelta::milliseconds(1); let iter = match (range.start_bound(), range.end_bound()) { (Bound::Unbounded, Bound::Unbounded) => Either::Left(cx.open(M::SPEC.keyspace).list()), (min, max) => { let lower = match min { Bound::Unbounded => [u8::MIN; 16], Bound::Included(inc) => Key::range(*inc).0, Bound::Excluded(exc) => Key::range(*exc + MS).0, }; let upper = match max { Bound::Unbounded => [u8::MAX; 16], Bound::Included(inc) => Key::range(*inc).1, Bound::Excluded(exc) => Key::range(*exc - MS).1, }; Either::Right(cx.open(M::SPEC.keyspace).range(lower, upper)) } }; iter.bind_results(|(k, v)| { let key = Key::from_slice(k.as_ref()); let val = decode(v)?; Ok((key, val)) }) } pub fn join_on( cx: &impl Query, iter: impl IntoIterator>, ) -> Result)>> where M: Mixin, { let keys: Vec = iter.into_iter().try_collect()?; cx.open(M::SPEC.keyspace) .join(keys.iter()) .into_iter() .zip(keys) .map(|(opt, key)| { let Some(buf) = opt? else { return Ok((key, None)); }; let val = decode(buf)?; Ok((key, Some(val))) }) .try_collect() } pub(super) fn encode(data: impl bincode::Encode) -> Result> { bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) } pub(super) fn decode(data: impl AsRef<[u8]>) -> Result where T: bincode::Decode, { bincode::decode_from_slice(data.as_ref(), bincode::config::standard()) .map_err(Error::Decoding) .map(|(v, _)| v) } }