diff --git a/Cargo.lock b/Cargo.lock index 4004297..89d6af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -888,6 +888,12 @@ dependencies = [ [[package]] name = "macro" version = "0.0.0" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.60", +] [[package]] name = "matchit" @@ -1134,6 +1140,8 @@ dependencies = [ name = "puppy" version = "0.0.0" dependencies = [ + "bincode", + "chrono", "fetch", "store", ] @@ -1494,6 +1502,7 @@ dependencies = [ "bincode", "chrono", "derive_more", + "either", "macro", "rocksdb", "tempfile", diff --git a/bin/pupctl/src/main.rs b/bin/pupctl/src/main.rs index d97dc1a..95c19c1 100644 --- a/bin/pupctl/src/main.rs +++ b/bin/pupctl/src/main.rs @@ -1,31 +1,26 @@ use puppy::{ - store::{ - self, - alias::Username, - arrow::{FollowRequested, Follows}, - mixin::Profile, - Error, - }, + model::{schema, Bite, FollowRequest, Follows, Profile, Username}, + store::{self, Error}, tl::Post, - Bite, Key, Store, + Key, Store, }; fn main() -> store::Result<()> { // Store::nuke(".state")?; - let db = Store::open(".state")?; + let db = Store::open(".state", schema())?; println!("creating actors"); let riley = get_or_create_actor(&db, "riley")?; let linen = get_or_create_actor(&db, "linen")?; - if false { + if true { println!("creating posts"); puppy::create_post(&db, riley, "@linen <3")?; puppy::create_post(&db, linen, "@riley <3")?; } - if false { + if true { println!("making riley follow linen"); - if !db.exists::((riley, linen))? { + if !db.exists::(riley, linen)? { println!("follow relation does not exist yet"); - if !db.exists::((riley, linen))? { + if !db.exists::(riley, linen)? { println!("no pending follow request; creating"); puppy::fr::create(&db, riley, linen)?; } else { @@ -36,44 +31,46 @@ fn main() -> store::Result<()> { println!("riley already follows linen"); } } - println!("Posts on the instance:"); + println!("\nPosts on the instance:"); for Post { id, content, author, } in puppy::tl::fetch_all(&db)? { - let (_, Profile { account_name, .. }) = db.lookup(author)?; + let Profile { account_name, .. } = db.get_mixin(author)?.unwrap(); let content = content.content.unwrap(); println!("- {id} by @{account_name} ({author}):\n{content}",) } - println!("Linen's followers:"); + println!("\nLinen's followers:"); for id in puppy::fr::followers_of(&db, linen)? { - let (_, Profile { account_name, .. }) = db.lookup(id)?; + let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); println!("- @{account_name} ({id})"); } - println!("Riley's following:"); + println!("\nRiley's following:"); for id in puppy::fr::following_of(&db, riley)? { - let (_, Profile { account_name, .. }) = db.lookup(id)?; + let Profile { account_name, .. } = db.get_mixin(id)?.unwrap(); println!("- @{account_name} ({id})"); } - println!("Biting riley"); - puppy::bite_actor(&db, linen, riley).unwrap(); - for Bite { id, biter, .. } in puppy::bites_on(&db, riley).unwrap() { - let (_, Profile { account_name, .. }) = db.lookup(biter).unwrap(); - println!("riley was bitten by @{account_name} at {}", id.timestamp()); + if false { + println!("Biting riley"); + puppy::bite_actor(&db, linen, riley).unwrap(); + for Bite { id, biter, .. } in puppy::bites_on(&db, riley).unwrap() { + let Profile { account_name, .. } = db.get_mixin(biter)?.unwrap(); + println!("riley was bitten by @{account_name} at {}", id.timestamp()); + } } store::OK } fn get_or_create_actor(db: &Store, username: &str) -> Result { - let user = db.translate::(username); + let user = db.lookup(Username(username.to_string())); match user { - Ok(key) => { + Ok(Some(key)) => { println!("found '{username}' ({key})"); Ok(key) } - Err(Error::Missing) => { + Ok(None) => { println!("'{username}' doesn't exist yet, creating"); let r = puppy::create_actor(&db, username); if let Ok(ref key) = r { diff --git a/lib/macro/Cargo.toml b/lib/macro/Cargo.toml index c562cd5..35fdf09 100644 --- a/lib/macro/Cargo.toml +++ b/lib/macro/Cargo.toml @@ -5,3 +5,9 @@ edition = "2021" [lib] path = "src/lib.rs" proc-macro = true + +[dependencies] +syn = { version = '2', features = ['full'] } +quote = '*' +proc-macro2 = '*' +heck = '*' diff --git a/lib/macro/src/arrow.rs b/lib/macro/src/arrow.rs new file mode 100644 index 0000000..359e020 --- /dev/null +++ b/lib/macro/src/arrow.rs @@ -0,0 +1,142 @@ +use heck::AsKebabCase; +use proc_macro::TokenStream; +use quote::{quote, ToTokens}; +use syn::{parse_macro_input, Data, DeriveInput, Field, Ident}; + +pub fn arrow(item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as DeriveInput); + let Data::Struct(structure) = input.data else { + panic!("Only structs are supported as arrows") + }; + match structure.fields { + syn::Fields::Named(fields) => from_named(&input.ident, fields), + syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => { + let first = f.unnamed.first().unwrap(); + from_newtype(&input.ident, first) + } + _ => panic!( + "Only newtype structs and structs with named fields can have a derived arrow impl" + ), + } +} + +fn from_named(name: &Ident, fields: syn::FieldsNamed) -> TokenStream { + let (origin, target, identity) = extract_idents(fields); + match identity { + Some(id) => make_multi_arrow(name, origin, target, id), + None => make_basic_arrow(name, origin, target), + } +} + +fn make_basic_arrow(name: &Ident, origin: Ident, target: Ident) -> TokenStream { + let spec = gen_spec(name); + TokenStream::from(quote! { + #spec + impl store::arrow::Arrow for #name {} + impl From for #name { + fn from(v: store::arrow::Basic) -> #name { + #name { + #origin: v.origin, + #target: v.target, + } + } + } + impl From<#name> for store::arrow::Basic { + fn from(v: #name) -> store::arrow::Basic { + store::arrow::Basic { + origin: v.#origin, + target: v.#target, + } + } + } + }) +} + +fn make_multi_arrow(name: &Ident, origin: Ident, target: Ident, id: Ident) -> TokenStream { + let spec = gen_spec(name); + TokenStream::from(quote! { + #spec + impl store::arrow::Arrow for #name { + type Kind = store::arrow::Multi; + } + impl From for #name { + fn from(v: store::arrow::Multi) -> #name { + #name { + #id: v.identity, + #origin: v.origin, + #target: v.target, + } + } + } + impl From<#name> for store::arrow::Multi { + fn from(v: #name) -> store::arrow::Multi { + store::arrow::Multi { + identity: v.#id, + origin: v.#origin, + target: v.#target, + } + } + } + }) +} + +fn extract_idents(fields: syn::FieldsNamed) -> (Ident, Ident, Option) { + let origin = extract_ident("origin", &fields).unwrap(); + let target = extract_ident("target", &fields).unwrap(); + let id = extract_ident("identity", &fields); + (origin, target, id) +} + +fn extract_ident(name: &str, fields: &syn::FieldsNamed) -> Option { + // Prefer marked fields and default to correctly named fields. + fields + .named + .iter() + .find(|field| { + field + .attrs + .iter() + .filter_map(|attr| attr.meta.path().get_ident()) + .any(|id| id == name) + }) + .and_then(|f| f.ident.clone()) + .or_else(|| { + fields + .named + .iter() + .filter_map(|f| f.ident.clone()) + .find(|id| id == name) + }) +} + +fn gen_spec(name: &Ident) -> impl ToTokens { + let prefix = AsKebabCase(name.to_string()); + let by_origin = format!("{prefix}/by-origin"); + let by_target = format!("{prefix}/by-target"); + quote! { + impl store::types::Value for #name { + type Type = store::types::ArrowSpec; + const SPEC: Self::Type = store::types::ArrowSpec { + by_origin: store::types::Namespace(#by_origin), + by_target: store::types::Namespace(#by_target), + }; + } + } +} + +fn from_newtype(name: &Ident, field: &Field) -> TokenStream { + let spec = gen_spec(name); + let typ = &field.ty; + TokenStream::from(quote! { + #spec + impl store::arrow::Arrow for #name { + type Kind = #typ; + } + impl From<#typ> for #name { + fn from(v: #typ) -> #name { #name(v) } + } + impl From<#name> for #typ { + fn from(v: #name) -> #typ { v.0 } + } + }) +} diff --git a/lib/macro/src/lib.rs b/lib/macro/src/lib.rs index 4d26183..8566309 100644 --- a/lib/macro/src/lib.rs +++ b/lib/macro/src/lib.rs @@ -1,16 +1,73 @@ use proc_macro::TokenStream; +mod arrow; + #[proc_macro_derive(Arrow, attributes(origin, target, identity))] pub fn arrow(item: TokenStream) -> TokenStream { - TokenStream::new() + arrow::arrow(item) } #[proc_macro_derive(Alias)] pub fn alias(item: TokenStream) -> TokenStream { - TokenStream::new() + let input = syn::parse_macro_input!(item as syn::DeriveInput); + let syn::Data::Struct(structure) = input.data else { + panic!("Only structs are supported as aliases") + }; + match structure.fields { + syn::Fields::Unnamed(f) if f.unnamed.len() == 1 => { + let first = f.unnamed.first().unwrap(); + make_alias_impl(&input.ident, first) + } + _ => panic!("Only string newtype structs are allowed as aliases"), + } +} + +fn make_alias_impl(name: &syn::Ident, field: &syn::Field) -> TokenStream { + let typ = &field.ty; + let prefix = heck::AsKebabCase(name.to_string()); + let keyspace = format!("{prefix}/keyspace"); + let reversed = format!("{prefix}/reversed"); + let spec = quote::quote! { + impl store::types::Value for #name { + type Type = store::types::AliasSpec; + const SPEC: Self::Type = store::types::AliasSpec { + keyspace: store::types::Namespace(#keyspace), + reversed: store::types::Namespace(#reversed), + }; + } + }; + + TokenStream::from(quote::quote! { + #spec + impl store::Alias for #name {} + impl AsRef for #name { + fn as_ref(&self) -> &str { self.0.as_ref() } + } + impl From<#typ> for #name { + fn from(v: #typ) -> #name { #name(v) } + } + }) } #[proc_macro_derive(Mixin)] pub fn mixin(item: TokenStream) -> TokenStream { - TokenStream::new() + let input = syn::parse_macro_input!(item as syn::DeriveInput); + + let name = input.ident; + let prefix = heck::AsKebabCase(name.to_string()); + let keyspace = format!("{prefix}/main"); + + let spec = quote::quote! { + impl store::types::Value for #name { + type Type = store::types::MixinSpec; + const SPEC: Self::Type = store::types::MixinSpec { + keyspace: store::types::Namespace(#keyspace), + }; + } + }; + + TokenStream::from(quote::quote! { + #spec + impl store::Mixin for #name {} + }) } diff --git a/lib/puppy/Cargo.toml b/lib/puppy/Cargo.toml index 613c157..da89aae 100644 --- a/lib/puppy/Cargo.toml +++ b/lib/puppy/Cargo.toml @@ -8,3 +8,5 @@ path = "src/lib.rs" [dependencies] store = { path = "../store" } fetch = { path = "../fetch" } +bincode = "2.0.0-rc.3" +chrono = "*" diff --git a/lib/puppy/src/lib.rs b/lib/puppy/src/lib.rs index d565063..ed8f6b6 100644 --- a/lib/puppy/src/lib.rs +++ b/lib/puppy/src/lib.rs @@ -1,10 +1,13 @@ #![feature(iterator_try_collect)] +use model::{AuthorOf, Bite, Content, Profile, Username}; +use store::util::{key, IterExt as _}; pub use store::{self, Key, Store}; pub mod model { + use bincode::{Decode, Encode}; use store::{types::Schema, Key}; - #[derive(store::Mixin)] + #[derive(store::Mixin, Encode, Decode)] pub struct Profile { pub post_count: usize, pub account_name: String, @@ -13,13 +16,13 @@ pub mod model { pub about_fields: Vec<(String, String)>, } - #[derive(store::Mixin)] + #[derive(store::Mixin, Encode, Decode)] pub struct Content { pub content: Option, pub summary: Option, } - #[derive(store::Arrow, Clone, Copy)] + #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct AuthorOf { #[origin] pub author: Key, @@ -27,7 +30,7 @@ pub mod model { pub object: Key, } - #[derive(store::Arrow, Clone, Copy)] + #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct Follows { #[origin] pub follower: Key, @@ -35,7 +38,7 @@ pub mod model { pub followed: Key, } - #[derive(store::Arrow, Clone, Copy)] + #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct Bite { #[identity] pub id: Key, @@ -45,7 +48,7 @@ pub mod model { pub victim: Key, } - #[derive(store::Arrow, Clone, Copy)] + #[derive(store::Arrow, Debug, PartialEq, Eq, Clone, Copy)] pub struct FollowRequest { #[identity] pub id: Key, @@ -109,7 +112,11 @@ pub fn create_actor(db: &Store, username: impl ToString) -> store::Result { pub fn list_posts_by_author(db: &Store, author: Key) -> store::Result> { db.run(|tx| { tx.outgoing::(author) - .bind_results(|arr| tx.get_mixin::(arr.object)) + .map_ok(|a| a.object) + .filter_bind_results(|post| { + let thing = tx.get_mixin(post)?; + Ok(thing.map(key(post))) + }) .collect() }) } @@ -129,7 +136,9 @@ pub fn bites_on(db: &Store, victim: Key) -> store::Result> { pub mod tl { //! Timelines - use store::{arrow::AuthorOf, mixin::Content, util::IterExt as _, Error, Key, Result, Store}; + use store::{util::IterExt as _, Error, Key, Result, Store}; + + use crate::model::{AuthorOf, Content}; pub struct Post { pub id: Key, @@ -139,9 +148,10 @@ pub mod tl { pub fn fetch_all(db: &Store) -> Result> { db.run(|tx| { - let iter = tx.list::(); + let iter = tx.range::(..); iter.bind_results(|(id, content)| { - let author = tx.incoming::(id).next_or(Error::Missing)?; + let AuthorOf { author, .. } = + tx.incoming::(id).next_or(Error::Missing)?; Ok(Post { id, author, @@ -185,31 +195,35 @@ pub mod fr { pub fn reject(db: &Store, requester: Key, target: Key) -> store::Result<()> { db.run(|tx| { - tx.remove_arrow::((requester, target))?; + tx.delete_all::(requester, target)?; OK }) } - pub fn list_pending(db: &Store, target: Key) -> store::Result> { - db.transaction(|tx| tx.list_incoming::(target).keys().collect()) + pub fn list_pending(db: &Store, target: Key) -> store::Result> { + db.incoming::(target).collect() } pub fn following_of(db: &Store, actor: Key) -> store::Result> { - db.transaction(|tx| tx.list_outgoing::(actor).keys().collect()) + db.outgoing::(actor) + .map_ok(|a| a.followed) + .collect() } pub fn followers_of(db: &Store, actor: Key) -> store::Result> { - db.transaction(|tx| tx.list_incoming::(actor).keys().collect()) + db.incoming::(actor) + .map_ok(|a| a.follower) + .collect() } #[cfg(test)] mod tests { - use store::{ - arrow::{FollowRequested, Follows}, - Key, Store, OK, - }; + use store::{Key, Store, OK}; - use crate::create_actor; + use crate::{ + create_actor, + model::{schema, FollowRequest, Follows}, + }; fn make_test_actors(db: &Store) -> store::Result<(Key, Key)> { let alice = create_actor(&db, "alice")?; @@ -220,18 +234,21 @@ pub mod fr { #[test] fn create_fr() -> store::Result<()> { - Store::with_tmp(|db| { + Store::test(schema(), |db| { let (alice, bob) = make_test_actors(&db)?; super::create(&db, alice, bob)?; assert!( - db.exists::((alice, bob))?, + db.exists::(alice, bob)?, "(alice -> bob) ∈ follow-requested" ); assert!( - !db.exists::((alice, bob))?, + !db.exists::(alice, bob)?, "(alice -> bob) ∉ follows" ); - let pending_for_bob = super::list_pending(&db, bob)?; + let pending_for_bob = super::list_pending(&db, bob)? + .into_iter() + .map(|fr| fr.origin) + .collect::>(); assert_eq!(pending_for_bob, vec![alice], "bob.pending = {{alice}}"); OK }) @@ -239,17 +256,17 @@ pub mod fr { #[test] fn accept_fr() -> store::Result<()> { - Store::with_tmp(|db| { + Store::test(schema(), |db| { let (alice, bob) = make_test_actors(&db)?; super::create(&db, alice, bob)?; super::accept(&db, alice, bob)?; assert!( - db.exists::((alice, bob))?, + db.exists::(alice, bob)?, "(alice -> bob) ∈ follows" ); assert!( - !db.exists::((bob, alice))?, + !db.exists::(bob, alice)?, "(bob -> alice) ∉ follows" ); @@ -265,7 +282,7 @@ pub mod fr { #[test] fn listing_follow_relations() -> store::Result<()> { - Store::with_tmp(|db| { + Store::test(schema(), |db| { let (alice, bob) = make_test_actors(&db)?; super::create(&db, alice, bob)?; super::accept(&db, alice, bob)?; diff --git a/lib/store/Cargo.toml b/lib/store/Cargo.toml index 08cb2f6..431004b 100644 --- a/lib/store/Cargo.toml +++ b/lib/store/Cargo.toml @@ -13,3 +13,4 @@ bincode = "2.0.0-rc.3" chrono = "*" tempfile = "*" macro = { path = "../macro" } +either = "*" diff --git a/lib/store/src/arrow.rs b/lib/store/src/arrow.rs index 42d5bc0..1c5105a 100644 --- a/lib/store/src/arrow.rs +++ b/lib/store/src/arrow.rs @@ -93,20 +93,18 @@ impl Store { op::exists::(self, origin, target) } /// Get all arrows of type `A` that point at `target`. - pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator> + 'a + pub fn incoming<'a, A>(&'a self, target: Key) -> impl Iterator> + 'a where - A::Kind: 'a, - A: Arrow, + A: Arrow + 'a, { - op::incoming::(self, target) + op::incoming::(self, target).map_ok(A::from) } /// Get all arrows of type `A` that point away from `origin`. - pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator> + 'a + pub fn outgoing<'a, A>(&'a self, origin: Key) -> impl Iterator> + 'a where - A::Kind: 'a, - A: Arrow, + A: Arrow + 'a, { - op::outgoing::(self, origin) + op::outgoing::(self, origin).map_ok(A::from) } } diff --git a/lib/store/src/internal.rs b/lib/store/src/internal.rs index 8d562e2..5763e46 100644 --- a/lib/store/src/internal.rs +++ b/lib/store/src/internal.rs @@ -49,6 +49,26 @@ where .full_iterator(&self.cf, IteratorMode::Start) .map_err(Error::Internal) } + /// Execute a range scan + pub fn range( + &self, + lower: [u8; N], + upper: [u8; N], + ) -> impl Iterator, Box<[u8]>)>> + 'db { + self.context + .full_iterator(&self.cf, IteratorMode::Start) + .skip_while(move |r| match r { + Ok((ref k, _)) => k.as_ref() < &lower, + _ => false, + }) + // The prefix iterator may "overshoot". This makes it stop when it reaches + // the end of the range that has the prefix. + .take_while(move |r| match r { + Ok((ref k, _)) => k.as_ref() < &upper, + _ => true, + }) + .map_err(Error::Internal) + } } impl Keyspace<'_, C> diff --git a/lib/store/src/key.rs b/lib/store/src/key.rs index 7cccb51..a4a2499 100644 --- a/lib/store/src/key.rs +++ b/lib/store/src/key.rs @@ -44,6 +44,11 @@ impl Key { let head = Key::from_slice(&buf[16..]); (tail, head) } + pub(crate) fn range(ts: DateTime) -> ([u8; 16], [u8; 16]) { + let min = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MIN).to_bytes(); + let max = Ulid::from_parts(ts.timestamp_millis() as u64, u128::MAX).to_bytes(); + (min, max) + } fn to_ulid(self) -> Ulid { Ulid::from_bytes(self.0) } diff --git a/lib/store/src/lib.rs b/lib/store/src/lib.rs index 004dd56..f8af4cf 100644 --- a/lib/store/src/lib.rs +++ b/lib/store/src/lib.rs @@ -1,5 +1,4 @@ #![feature(iterator_try_collect, associated_type_defaults)] -#![feature(marker_trait_attr)] //! Data persistence for the ActivityPuppy social media server built on top of [rocksdb]. //! //! # Overview @@ -158,4 +157,4 @@ pub enum Error { Decoding(bincode::error::DecodeError), } -pub type Backend = rocksdb::TransactionDB; +type Backend = rocksdb::TransactionDB; diff --git a/lib/store/src/mixin.rs b/lib/store/src/mixin.rs index 2f746c8..23943a7 100644 --- a/lib/store/src/mixin.rs +++ b/lib/store/src/mixin.rs @@ -1,4 +1,11 @@ +use std::{ + fmt::Pointer, + ops::{Bound, RangeBounds}, +}; + use bincode::{Decode, Encode}; +use chrono::{DateTime, TimeDelta, Utc}; +use either::Either; /// Derive a [`Mixin`] implementation. pub use r#macro::Mixin; @@ -6,7 +13,7 @@ use super::{ types::{MixinSpec, Value}, Batch, Store, Transaction, }; -use crate::{Error, Key, Result}; +use crate::{internal::Query, util::IterExt, Error, Key, Result}; /// Mixins are the simplest pieces of data in the store. pub trait Mixin: Value + Encode + Decode {} @@ -73,6 +80,40 @@ impl Transaction<'_> { { op::has_mixin::(self, node) } + /// Get all `M`s where the key's timestamp is within the `range`. + pub fn range( + &self, + range: impl RangeBounds>, + ) -> impl Iterator> + '_ + where + M: Mixin, + { + use crate::internal::Context as _; + const MS: TimeDelta = TimeDelta::milliseconds(1); + let iter = match (range.start_bound(), range.end_bound()) { + (Bound::Unbounded, Bound::Unbounded) => { + Either::Left(self.open(M::SPEC.keyspace).list()) + } + (min, max) => { + let lower = match min { + Bound::Unbounded => [u8::MIN; 16], + Bound::Included(inc) => Key::range(*inc).0, + Bound::Excluded(exc) => Key::range(*exc + MS).0, + }; + let upper = match max { + Bound::Unbounded => [u8::MAX; 16], + Bound::Included(inc) => Key::range(*inc).1, + Bound::Excluded(exc) => Key::range(*exc - MS).1, + }; + Either::Right(self.open(M::SPEC.keyspace).range(lower, upper)) + } + }; + iter.bind_results(|(k, v)| { + let key = Key::from_slice(k.as_ref()); + let val = op::decode(v)?; + Ok((key, val)) + }) + } } impl Batch { @@ -136,7 +177,7 @@ mod op { bincode::encode_to_vec(data, bincode::config::standard()).map_err(Error::Encoding) } - fn decode(data: impl AsRef<[u8]>) -> Result + pub(super) fn decode(data: impl AsRef<[u8]>) -> Result where T: bincode::Decode, { diff --git a/lib/store/src/util.rs b/lib/store/src/util.rs index e082a37..8fac2e0 100644 --- a/lib/store/src/util.rs +++ b/lib/store/src/util.rs @@ -46,6 +46,20 @@ pub trait IterExt: Iterator + Sized { { self.next().ok_or(e)? } + /// `filter_map` meets `and_then`. + fn filter_bind_results<'a, I, O, E>( + self, + mut f: impl FnMut(I) -> Result, E> + 'a, + ) -> impl Iterator> + 'a + where + Self: Iterator> + 'a, + { + self.filter_map(move |r| r.and_then(|x| f(x)).transpose()) + } } impl IterExt for I where I: Iterator {} + +pub fn key(key: K) -> impl FnOnce(V) -> (K, V) { + move |val| (key, val) +}