RocksDb store implementation

This commit is contained in:
Jeremy Wall 2022-08-24 14:41:14 -04:00
parent 45486b5cc7
commit 0e3e252570
7 changed files with 122 additions and 15 deletions

View File

@ -27,8 +27,13 @@ optional = true
version = "0.10.4"
optional = true
[dependencies.rocksdb]
version = "0.19.0"
optional = true
[features]
default = []
cbor = ["dep:ciborium"]
blake2 = ["dep:blake2"]
rusty-leveldb = ["dep:rusty-leveldb", "blake2", "cbor"]
rocksdb = ["dep:rocksdb", "blake2", "cbor"]

View File

@ -61,8 +61,12 @@ where
S: Store<HW>,
{
/// Construct a new empty DAG. The empty DAG is also the default for a DAG.
pub fn new() -> Self {
Self::default()
pub fn new(s: S) -> Self {
Self {
nodes: s,
roots: Default::default(),
_phantom_node: PhantomData,
}
}
/// Add a new payload with a required set of dependency_ids. This method will construct a new node
@ -179,7 +183,7 @@ where
impl<S, HW> Default for Merkle<S, HW>
where
HW: HashWriter,
S: Store<HW>,
S: Store<HW> + Default,
{
fn default() -> Self {
Self {

View File

@ -21,6 +21,8 @@ pub mod hash;
pub mod leveldb;
pub mod node;
pub mod prelude;
#[cfg(feature = "rocksdb")]
pub mod rocksdb;
pub mod store;
#[cfg(test)]

View File

@ -40,7 +40,7 @@ fn complex_dag_strategy(
) -> impl Strategy<Value = TestDag> {
prop::collection::vec(".*", depth..nodes_count).prop_flat_map(move |payloads| {
let nodes_len = payloads.len();
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
// partition the payloads into depth pieces
let mut id_stack: Vec<Vec<u8>> = Vec::new();
for chunk in payloads.chunks(nodes_len / depth) {
@ -73,7 +73,7 @@ proptest! {
#[test]
fn test_dag_add_node_properties((nodes, parent_idxs) in simple_edge_strategy(100)) {
// TODO implement the tests now
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let parent_count = parent_idxs.len();
let mut dependents = BTreeMap::new();
let mut node_set = BTreeSet::new();

95
src/rocksdb/mod.rs Normal file
View File

@ -0,0 +1,95 @@
// Copyright 2022 Jeremy Wall (Jeremy@marzhilsltudios.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Module implementing a rocksdb store interfaces for a MerkleDag.
//! Requires the `rocksdb` feature to be enabled.
use std::path::Path;
use crate::blake2::*;
use crate::{
node::Node,
store::{Result as StoreResult, Store, StoreError},
};
use ciborium;
use rocksdb::{DBWithThreadMode, MultiThreaded, Options, SingleThreaded, ThreadMode};
pub type Result<T> = std::result::Result<T, rocksdb::Error>;
pub struct RocksStore<TM>
where
TM: ThreadMode,
{
store: DBWithThreadMode<TM>,
}
/// Type alias for a `RocksStore<SingleThreaded>`.
pub type SingleThreadedRocksStore = RocksStore<SingleThreaded>;
/// Type alias for a `RocksStore<Multithreaded>`.
pub type MultiThreadedRocksStore = RocksStore<MultiThreaded>;
/// A Rocksdb `Store` implementation generic over the single and multithreaded
/// versions.
impl<TM> RocksStore<TM>
where
TM: ThreadMode,
{
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let opts = Options::default();
Ok(Self {
store: DBWithThreadMode::<TM>::open(&opts, path)?,
})
}
}
impl<TM> Store<Blake2b512> for RocksStore<TM>
where
TM: ThreadMode,
{
fn contains(&self, id: &[u8]) -> StoreResult<bool> {
Ok(self
.store
.get(id)
.map_err(|e| StoreError::StoreFailure(format!("{:?}", e)))?
.is_some())
}
fn get(&self, id: &[u8]) -> StoreResult<Option<Node<Blake2b512>>> {
Ok(
match self
.store
.get(id)
.map_err(|e| StoreError::StoreFailure(format!("{:?}", e)))?
{
Some(bs) => ciborium::de::from_reader(bs.as_slice()).map_err(|e| {
StoreError::StoreFailure(format!("Invalid serialization {:?}", e))
})?,
None => None,
},
)
}
fn store(&mut self, node: Node<Blake2b512>) -> StoreResult<()> {
let mut buf = Vec::new();
ciborium::ser::into_writer(&node, &mut buf).unwrap();
self.store.put(node.id(), &buf)?;
Ok(())
}
}
impl From<rocksdb::Error> for StoreError {
fn from(err: rocksdb::Error) -> Self {
StoreError::StoreFailure(format!("{}", err))
}
}

View File

@ -26,7 +26,7 @@ pub enum StoreError {
}
/// Trait representing the backing storage interface for a `DAG`.
pub trait Store<HW>: Default
pub trait Store<HW>
where
HW: HashWriter,
{

View File

@ -23,7 +23,7 @@ type TestDag<'a> = Merkle<
#[test]
fn test_root_pointer_hygiene() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quax_node_id = dag.add_node("quax", BTreeSet::new()).unwrap();
assert_eq!(
quax_node_id,
@ -45,7 +45,7 @@ fn test_root_pointer_hygiene() {
fn test_insert_no_such_dependents_error() {
let missing_dependent =
Node::<DefaultHasher>::new("missing".as_bytes().to_vec(), BTreeSet::new());
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let mut dep_set = BTreeSet::new();
dep_set.insert(missing_dependent.id().to_vec());
assert!(dag.add_node("foo", dep_set).is_err());
@ -55,7 +55,7 @@ fn test_insert_no_such_dependents_error() {
#[test]
fn test_adding_nodes_is_idempotent() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quax_node_id = dag.add_node("quax", BTreeSet::new()).unwrap();
assert_eq!(
quax_node_id,
@ -71,7 +71,7 @@ fn test_adding_nodes_is_idempotent() {
#[test]
fn test_adding_nodes_is_idempotent_regardless_of_dep_order() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag.add_node("qualm", BTreeSet::new()).unwrap();
let quell_node_id = dag.add_node("quell", BTreeSet::new()).unwrap();
@ -105,7 +105,7 @@ fn test_adding_nodes_is_idempotent_regardless_of_dep_order() {
#[test]
fn test_node_comparison_equivalent() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
assert_eq!(
dag.compare(&quake_node_id, &quake_node_id).unwrap(),
@ -115,7 +115,7 @@ fn test_node_comparison_equivalent() {
#[test]
fn test_node_comparison_before() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag
.add_node("qualm", BTreeSet::from([quake_node_id.clone()]))
@ -135,7 +135,7 @@ fn test_node_comparison_before() {
#[test]
fn test_node_comparison_after() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag
.add_node("qualm", BTreeSet::from([quake_node_id.clone()]))
@ -155,7 +155,7 @@ fn test_node_comparison_after() {
#[test]
fn test_node_comparison_no_shared_graph() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag.add_node("qualm", BTreeSet::new()).unwrap();
let quell_node_id = dag.add_node("quell", BTreeSet::new()).unwrap();
@ -178,11 +178,12 @@ mod cbor_serialization_tests {
use super::TestDag;
use crate::prelude::*;
use ciborium::{de::from_reader, ser::into_writer};
use std::collections::BTreeMap;
use std::collections::{hash_map::DefaultHasher, BTreeSet};
#[test]
fn test_node_deserializaton() {
let mut dag = TestDag::new();
let mut dag = TestDag::new(BTreeMap::new());
let simple_node_id = dag.add_node("simple", BTreeSet::new()).unwrap();
let mut dep_set = BTreeSet::new();
dep_set.insert(simple_node_id.clone());