refactor: convert this whole thing into async

This commit is contained in:
Jeremy Wall 2024-07-04 08:24:40 -05:00
parent d0faf6ffe1
commit 57953e1861
6 changed files with 227 additions and 167 deletions

View File

@ -5,6 +5,12 @@ edition = "2021"
authors = ["Jeremy Wall <jeremy@marzhillstudios.com>"]
license = "Apache License 2.0"
[dependencies]
[dependencies.async-std]
version = "1.12.0"
features = [ "attributes" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies.ciborium]

View File

@ -11,17 +11,20 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use super::Merkle;
use crate::hash::HashWriter;
use crate::node::Node;
use crate::store::{Result, Store};
use crate::store::{AsyncStore, Result};
use async_std::stream::Stream;
use std::collections::BTreeSet;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
/// An iterator over the missing [nodes](Node) in a [Merkle DAG](Merkle) given a set of root nodes.
pub struct Missing<'dag, S, HW>
where
S: Store<HW>,
S: AsyncStore<HW>,
HW: HashWriter,
{
dag: &'dag Merkle<S, HW>,
@ -30,7 +33,7 @@ where
impl<'dag, S, HW> Missing<'dag, S, HW>
where
S: Store<HW>,
S: AsyncStore<HW>,
HW: HashWriter,
{
/// Create an iterator for the missing [nodes](Node) given a set of root [nodes](Node).
@ -39,8 +42,11 @@ where
}
/// Returns the next set of missing [nodes](Node) in the iterator.
pub fn next_nodes(&mut self) -> Result<Option<Vec<Node<HW>>>> {
let nodes = self.dag.find_next_non_descendant_nodes(&self.root_nodes)?;
pub async fn next_nodes(&mut self) -> Result<Option<Vec<Node<HW>>>> {
let nodes = self
.dag
.find_next_non_descendant_nodes(&self.root_nodes)
.await?;
self.root_nodes = BTreeSet::new();
for id in nodes.iter().map(|n| n.id().to_vec()) {
self.root_nodes.insert(id);
@ -53,18 +59,25 @@ where
}
}
impl<'dag, S, HW> Iterator for Missing<'dag, S, HW>
impl<'dag, S, HW> Stream for Missing<'dag, S, HW>
where
S: Store<HW>,
S: AsyncStore<HW>,
HW: HashWriter,
{
type Item = Result<Vec<Node<HW>>>;
fn next(&mut self) -> Option<Self::Item> {
match self.next_nodes() {
Ok(Some(ns)) => Some(Ok(ns)),
Ok(None) => None,
Err(e) => Some(Err(e)),
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let next_nodes = self.next_nodes();
match std::pin::pin!(next_nodes).poll(cx) {
Poll::Ready(result) => Poll::Ready(match result {
Ok(Some(ns)) => Some(Ok(ns)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}),
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -18,7 +18,7 @@ use std::{collections::BTreeSet, marker::PhantomData};
use crate::{
hash::HashWriter,
node::Node,
store::{Result, Store, StoreError},
store::{AsyncStore, Result, StoreError},
};
mod iter;
@ -50,7 +50,7 @@ pub enum NodeCompare {
pub struct Merkle<S, HW>
where
HW: HashWriter,
S: Store<HW>,
S: AsyncStore<HW>,
{
roots: BTreeSet<Vec<u8>>,
nodes: S,
@ -60,7 +60,7 @@ where
impl<S, HW> Merkle<S, HW>
where
HW: HashWriter,
S: Store<HW>,
S: AsyncStore<HW>,
{
/// Construct a new DAG.
pub fn new(s: S) -> Self {
@ -77,18 +77,18 @@ where
///
/// One result of not constructing and then adding [nodes](Node) is that we ensure that we always
/// satisfy the implementation rule in the merkel-crdt's whitepaper.
pub fn add_node<'a, N: Into<Vec<u8>>>(
pub async fn add_node<'a, N: Into<Vec<u8>>>(
&'a mut self,
item: N,
dependency_ids: BTreeSet<Vec<u8>>,
) -> Result<Vec<u8>> {
let node = Node::<HW>::new(item.into(), dependency_ids.clone());
let id = node.id().to_vec();
if self.nodes.contains(id.as_slice())? {
if self.nodes.contains(id.as_slice()).await? {
// We've already added this node so there is nothing left to do.
return Ok(self
.nodes
.get(id.as_slice())
.get(id.as_slice()).await
.unwrap()
.unwrap()
.id()
@ -96,7 +96,7 @@ where
}
let mut root_removals = Vec::new();
for dep_id in dependency_ids.iter() {
if !self.nodes.contains(dep_id)? {
if !self.nodes.contains(dep_id).await? {
return Err(StoreError::NoSuchDependents);
}
// If any of our dependencies is in the roots pointer list then
@ -105,7 +105,7 @@ where
root_removals.push(dep_id);
}
}
self.nodes.store(node)?;
self.nodes.store(node).await?;
for removal in root_removals {
self.roots.remove(removal);
}
@ -114,13 +114,13 @@ where
}
/// Check if we already have a copy of a [Node].
pub fn check_for_node(&self, id: &[u8]) -> Result<bool> {
return self.nodes.contains(id);
pub async fn check_for_node(&self, id: &[u8]) -> Result<bool> {
return self.nodes.contains(id).await;
}
/// Get a [Node] from the DAG by it's hash identifier if it exists.
pub fn get_node_by_id(&self, id: &[u8]) -> Result<Option<Node<HW>>> {
self.nodes.get(id)
pub async fn get_node_by_id(&self, id: &[u8]) -> Result<Option<Node<HW>>> {
self.nodes.get(id).await
}
/// Get the set of root [Node] ids.
@ -138,15 +138,15 @@ where
/// then returns [NodeCompare::After]. If both id's are equal then the returns
/// [NodeCompare::Equivalent]. If neither id are parts of the same subgraph then returns
/// [NodeCompare::Uncomparable].
pub fn compare(&self, left: &[u8], right: &[u8]) -> Result<NodeCompare> {
pub async fn compare(&self, left: &[u8], right: &[u8]) -> Result<NodeCompare> {
Ok(if left == right {
NodeCompare::Equivalent
} else {
// Is left node an ancestor of right node?
if self.search_graph(right, left)? {
if self.search_graph(right, left).await? {
NodeCompare::Before
// is right node an ancestor of left node?
} else if self.search_graph(left, right)? {
} else if self.search_graph(left, right).await? {
NodeCompare::After
} else {
NodeCompare::Uncomparable
@ -166,7 +166,7 @@ where
}
/// Find the immediate next non descendant [nodes](Node) in this graph for the given `search_nodes`.
pub fn find_next_non_descendant_nodes(
pub async fn find_next_non_descendant_nodes(
&self,
search_nodes: &BTreeSet<Vec<u8>>,
) -> Result<Vec<Node<HW>>> {
@ -174,7 +174,7 @@ where
let mut ids = BTreeSet::new();
while !stack.is_empty() {
let node_id = stack.pop().unwrap();
let node = self.get_node_by_id(node_id.as_slice())?.unwrap();
let node = self.get_node_by_id(node_id.as_slice()).await?.unwrap();
let deps = node.dependency_ids();
if deps.len() == 0 {
// This is a leaf node which means it's the beginning of a sub graph
@ -193,16 +193,16 @@ where
}
let mut result = Vec::new();
for id in ids {
result.push(self.get_node_by_id(id.as_slice())?.unwrap());
result.push(self.get_node_by_id(id.as_slice()).await?.unwrap());
}
Ok(result)
}
fn search_graph(&self, root_id: &[u8], search_id: &[u8]) -> Result<bool> {
async fn search_graph(&self, root_id: &[u8], search_id: &[u8]) -> Result<bool> {
if root_id == search_id {
return Ok(true);
}
let root_node = match self.get_node_by_id(root_id)? {
let root_node = match self.get_node_by_id(root_id).await? {
Some(n) => n,
None => {
return Ok(false);
@ -216,7 +216,7 @@ where
if search_id == dep {
return Ok(true);
}
stack.push(match self.get_node_by_id(dep)? {
stack.push(match self.get_node_by_id(dep).await? {
Some(n) => n,
None => panic!("Invalid DAG STATE encountered"),
})
@ -229,7 +229,7 @@ where
impl<S, HW> Default for Merkle<S, HW>
where
HW: HashWriter,
S: Store<HW> + Default,
S: AsyncStore<HW> + Default,
{
fn default() -> Self {
Self {

View File

@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_std::task::block_on;
use std::collections::{hash_map::DefaultHasher, BTreeMap, BTreeSet};
use proptest::prelude::*;
@ -62,7 +63,10 @@ fn complex_dag_strategy(
for (idx, p) in chunk.iter().enumerate() {
let dep_idx = idx % dep_set_len;
let dep_set = dep_sets[dep_idx].clone();
id_stack.push(dag.add_node(p.clone(), dep_set).unwrap().clone());
// NOTE(zaphar): We need to block on here.
block_on( async {
id_stack.push(dag.add_node(p.clone(), dep_set).await.unwrap().clone());
});
}
}
Just(dag)
@ -72,63 +76,67 @@ fn complex_dag_strategy(
proptest! {
#[test]
fn test_dag_add_node_properties((nodes, parent_idxs) in simple_edge_strategy(100)) {
// TODO implement the tests now
let mut dag = TestDag::new(BTreeMap::new());
let parent_count = parent_idxs.len();
let mut dependents = BTreeMap::new();
let mut node_set = BTreeSet::new();
for (idx, n) in nodes.iter().cloned().enumerate() {
if !parent_idxs.contains(&idx) {
let node_id = dag.add_node(n.as_bytes(), BTreeSet::new()).unwrap();
node_set.insert(node_id.clone());
let parent = idx % parent_count;
if dependents.contains_key(&parent) {
dependents.get_mut(&parent).map(|v: &mut BTreeSet<Vec<u8>>| v.insert(node_id));
} else {
dependents.insert(parent, BTreeSet::from([node_id]));
block_on(async {
// TODO implement the tests now
let mut dag = TestDag::new(BTreeMap::new());
let parent_count = parent_idxs.len();
let mut dependents = BTreeMap::new();
let mut node_set = BTreeSet::new();
for (idx, n) in nodes.iter().cloned().enumerate() {
if !parent_idxs.contains(&idx) {
let node_id = dag.add_node(n.as_bytes(), BTreeSet::new()).await.unwrap();
node_set.insert(node_id.clone());
let parent = idx % parent_count;
if dependents.contains_key(&parent) {
dependents.get_mut(&parent).map(|v: &mut BTreeSet<Vec<u8>>| v.insert(node_id));
} else {
dependents.insert(parent, BTreeSet::from([node_id]));
}
}
}
}
for (pidx, dep_ids) in dependents {
let node_id = dag.add_node(nodes[pidx].clone(), dep_ids).unwrap();
node_set.insert(node_id.clone());
}
assert!(dag.get_roots().len() <= parent_count);
assert!(dag.get_nodes().len() == node_set.len());
for (pidx, dep_ids) in dependents {
let node_id = dag.add_node(nodes[pidx].clone(), dep_ids).await.unwrap();
node_set.insert(node_id.clone());
}
assert!(dag.get_roots().len() <= parent_count);
assert!(dag.get_nodes().len() == node_set.len());
})
}
}
proptest! {
#[test]
fn test_complex_dag_node_properties(dag in complex_dag_strategy(100, 10, 3)) {
// TODO(jwall): We can assert much more about the Merkle if we get more clever in what we return.
let nodes = dag.get_nodes();
assert!(nodes.len() <= 100);
block_on(async {
// TODO(jwall): We can assert much more about the Merkle if we get more clever in what we return.
let nodes = dag.get_nodes();
assert!(nodes.len() <= 100);
let roots = dag.get_roots();
assert!(roots.len() < dag.get_nodes().len());
let roots = dag.get_roots();
assert!(roots.len() < dag.get_nodes().len());
for node_id in nodes.keys() {
let mut is_descendant = false;
if roots.contains(node_id) {
continue;
for node_id in nodes.keys() {
let mut is_descendant = false;
if roots.contains(node_id) {
continue;
}
for root in roots.iter() {
if let NodeCompare::After = dag.compare(root, node_id).await.unwrap() {
// success
is_descendant = true;
}
}
assert!(is_descendant);
}
for root in roots.iter() {
if let NodeCompare::After = dag.compare(root, node_id).unwrap() {
// success
is_descendant = true;
// Check that every root node is uncomparable.
for left_root in roots.iter() {
for right_root in roots.iter() {
if left_root != right_root {
assert_eq!(dag.compare(left_root, right_root).await.unwrap(), NodeCompare::Uncomparable);
}
}
}
assert!(is_descendant);
}
// Check that every root node is uncomparable.
for left_root in roots.iter() {
for right_root in roots.iter() {
if left_root != right_root {
assert_eq!(dag.compare(left_root, right_root).unwrap(), NodeCompare::Uncomparable);
}
}
}
});
}
}
@ -137,17 +145,18 @@ proptest! {
#[test]
fn test_node_serde_strategy(dag in complex_dag_strategy(100, 10, 3)) {
use ciborium::{de::from_reader, ser::into_writer};
let nodes = dag.get_nodes();
for (_, node) in nodes {
let node = node.clone();
let mut buf: Vec<u8> = Vec::new();
into_writer(&node, &mut buf).unwrap();
let node_de: Node<DefaultHasher> = from_reader(buf.as_slice()).unwrap();
assert_eq!(node.id(), node_de.id());
assert_eq!(node.item_id(), node_de.item_id());
assert_eq!(node.item(), node_de.item());
assert_eq!(node.dependency_ids(), node_de.dependency_ids());
}
block_on(async {
let nodes = dag.get_nodes();
for (_, node) in nodes {
let node = node.clone();
let mut buf: Vec<u8> = Vec::new();
into_writer(&node, &mut buf).unwrap();
let node_de: Node<DefaultHasher> = from_reader(buf.as_slice()).unwrap();
assert_eq!(node.id(), node_de.id());
assert_eq!(node.item_id(), node_de.item_id());
assert_eq!(node.item(), node_de.item());
assert_eq!(node.dependency_ids(), node_de.dependency_ids());
}
});
}
}

View File

@ -25,6 +25,38 @@ pub enum StoreError {
NoSuchDependents,
}
#[allow(async_fn_in_trait)]
/// Trait representing the backing storage interface for a [Merkle DAG](crate::dag::Merkle).
pub trait AsyncStore<HW>
where
HW: HashWriter,
{
/// Checks if the [Store] contains a [Node] with this id.
async fn contains(&self, id: &[u8]) -> Result<bool>;
/// Fetches a node from the [Store] by id if it exists.
async fn get(&self, id: &[u8]) -> Result<Option<Node<HW>>>;
/// Stores a given [Node].
async fn store(&mut self, node: Node<HW>) -> Result<()>;
}
impl<HW, S> AsyncStore<HW> for S
where
HW: HashWriter,
S: Store<HW>,
{
async fn contains(&self, id: &[u8]) -> Result<bool> {
std::future::ready(self.contains(id)).await
}
async fn get(&self, id: &[u8]) -> Result<Option<Node<HW>>> {
std::future::ready(self.get(id)).await
}
async fn store(&mut self, node: Node<HW>) -> Result<()> {
std::future::ready(self.store(node)).await
}
}
/// Trait representing the backing storage interface for a [Merkle DAG](crate::dag::Merkle).
pub trait Store<HW>
where

View File

@ -13,7 +13,7 @@
// limitations under the License.
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, BTreeSet};
use async_std;
use crate::prelude::*;
type TestDag<'a> = Merkle<
@ -21,66 +21,66 @@ type TestDag<'a> = Merkle<
std::collections::hash_map::DefaultHasher,
>;
#[test]
fn test_root_pointer_hygiene() {
#[async_std::test]
async fn test_root_pointer_hygiene() {
let mut dag = TestDag::new(BTreeMap::new());
let quax_node_id = dag.add_node("quax", BTreeSet::new()).unwrap();
let quax_node_id = dag.add_node("quax", BTreeSet::new()).await.unwrap();
assert_eq!(
quax_node_id,
*dag.get_node_by_id(&quax_node_id).unwrap().unwrap().id()
*dag.get_node_by_id(&quax_node_id).await.unwrap().unwrap().id()
);
assert!(dag.get_roots().contains(&quax_node_id));
let mut dep_set = BTreeSet::new();
dep_set.insert(quax_node_id.clone());
let quux_node_id = dag.add_node("quux", dep_set).unwrap();
let quux_node_id = dag.add_node("quux", dep_set).await.unwrap();
assert!(!dag.get_roots().contains(&quax_node_id));
assert!(dag.get_roots().contains(&quux_node_id));
assert_eq!(
quux_node_id,
*dag.get_node_by_id(&quux_node_id).unwrap().unwrap().id()
*dag.get_node_by_id(&quux_node_id).await.unwrap().unwrap().id()
);
}
#[test]
fn test_insert_no_such_dependents_error() {
#[async_std::test]
async fn test_insert_no_such_dependents_error() {
let missing_dependent =
Node::<DefaultHasher>::new("missing".as_bytes().to_vec(), BTreeSet::new());
let mut dag = TestDag::new(BTreeMap::new());
let mut dep_set = BTreeSet::new();
dep_set.insert(missing_dependent.id().to_vec());
assert!(dag.add_node("foo", dep_set).is_err());
assert!(dag.add_node("foo", dep_set).await.is_err());
assert!(dag.get_roots().is_empty());
assert!(dag.get_nodes().is_empty());
}
#[test]
fn test_adding_nodes_is_idempotent() {
#[async_std::test]
async fn test_adding_nodes_is_idempotent() {
let mut dag = TestDag::new(BTreeMap::new());
let quax_node_id = dag.add_node("quax", BTreeSet::new()).unwrap();
let quax_node_id = dag.add_node("quax", BTreeSet::new()).await.unwrap();
assert_eq!(
quax_node_id,
*dag.get_node_by_id(&quax_node_id).unwrap().unwrap().id()
*dag.get_node_by_id(&quax_node_id).await.unwrap().unwrap().id()
);
assert!(dag.get_roots().contains(&quax_node_id));
let root_size = dag.get_roots().len();
let nodes_size = dag.get_nodes().len();
dag.add_node("quax", BTreeSet::new()).unwrap();
dag.add_node("quax", BTreeSet::new()).await.unwrap();
assert_eq!(root_size, dag.get_roots().len());
assert_eq!(nodes_size, dag.get_nodes().len());
}
#[test]
fn test_adding_nodes_is_idempotent_regardless_of_dep_order() {
#[async_std::test]
async fn test_adding_nodes_is_idempotent_regardless_of_dep_order() {
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag.add_node("qualm", BTreeSet::new()).unwrap();
let quell_node_id = dag.add_node("quell", BTreeSet::new()).unwrap();
let quake_node_id = dag.add_node("quake", BTreeSet::new()).await.unwrap();
let qualm_node_id = dag.add_node("qualm", BTreeSet::new()).await.unwrap();
let quell_node_id = dag.add_node("quell", BTreeSet::new()).await.unwrap();
let dep_ids = BTreeSet::from([
quake_node_id.clone(),
qualm_node_id.clone(),
quell_node_id.clone(),
]);
dag.add_node("foo", dep_ids).unwrap();
dag.add_node("foo", dep_ids).await.unwrap();
let root_size = dag.get_roots().len();
let nodes_size = dag.get_nodes().len();
@ -89,7 +89,7 @@ fn test_adding_nodes_is_idempotent_regardless_of_dep_order() {
quake_node_id.clone(),
qualm_node_id.clone(),
]);
dag.add_node("foo", dep_ids).unwrap();
dag.add_node("foo", dep_ids).await.unwrap();
assert_eq!(root_size, dag.get_roots().len());
assert_eq!(nodes_size, dag.get_nodes().len());
@ -98,90 +98,90 @@ fn test_adding_nodes_is_idempotent_regardless_of_dep_order() {
quell_node_id.clone(),
quake_node_id.clone(),
]);
dag.add_node("foo", dep_ids).unwrap();
dag.add_node("foo", dep_ids).await.unwrap();
assert_eq!(root_size, dag.get_roots().len());
assert_eq!(nodes_size, dag.get_nodes().len());
}
#[test]
fn test_node_comparison_equivalent() {
#[async_std::test]
async fn test_node_comparison_equivalent() {
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let quake_node_id = dag.add_node("quake", BTreeSet::new()).await.unwrap();
assert_eq!(
dag.compare(&quake_node_id, &quake_node_id).unwrap(),
dag.compare(&quake_node_id, &quake_node_id).await.unwrap(),
NodeCompare::Equivalent
);
}
#[test]
fn test_node_comparison_before() {
#[async_std::test]
async fn test_node_comparison_before() {
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let quake_node_id = dag.add_node("quake", BTreeSet::new()).await.unwrap();
let qualm_node_id = dag
.add_node("qualm", BTreeSet::from([quake_node_id.clone()]))
.add_node("qualm", BTreeSet::from([quake_node_id.clone()])).await
.unwrap();
let quell_node_id = dag
.add_node("quell", BTreeSet::from([qualm_node_id.clone()]))
.add_node("quell", BTreeSet::from([qualm_node_id.clone()])).await
.unwrap();
assert_eq!(
dag.compare(&quake_node_id, &qualm_node_id).unwrap(),
dag.compare(&quake_node_id, &qualm_node_id).await.unwrap(),
NodeCompare::Before
);
assert_eq!(
dag.compare(&quake_node_id, &quell_node_id).unwrap(),
dag.compare(&quake_node_id, &quell_node_id).await.unwrap(),
NodeCompare::Before
);
}
#[test]
fn test_node_comparison_after() {
#[async_std::test]
async fn test_node_comparison_after() {
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let quake_node_id = dag.add_node("quake", BTreeSet::new()).await.unwrap();
let qualm_node_id = dag
.add_node("qualm", BTreeSet::from([quake_node_id.clone()]))
.add_node("qualm", BTreeSet::from([quake_node_id.clone()])).await
.unwrap();
let quell_node_id = dag
.add_node("quell", BTreeSet::from([qualm_node_id.clone()]))
.add_node("quell", BTreeSet::from([qualm_node_id.clone()])).await
.unwrap();
assert_eq!(
dag.compare(&qualm_node_id, &quake_node_id).unwrap(),
dag.compare(&qualm_node_id, &quake_node_id).await.unwrap(),
NodeCompare::After
);
assert_eq!(
dag.compare(&quell_node_id, &quake_node_id).unwrap(),
dag.compare(&quell_node_id, &quake_node_id).await.unwrap(),
NodeCompare::After
);
}
#[test]
fn test_node_comparison_no_shared_graph() {
#[async_std::test]
async fn test_node_comparison_no_shared_graph() {
let mut dag = TestDag::new(BTreeMap::new());
let quake_node_id = dag.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag.add_node("qualm", BTreeSet::new()).unwrap();
let quell_node_id = dag.add_node("quell", BTreeSet::new()).unwrap();
let quake_node_id = dag.add_node("quake", BTreeSet::new()).await.unwrap();
let qualm_node_id = dag.add_node("qualm", BTreeSet::new()).await.unwrap();
let quell_node_id = dag.add_node("quell", BTreeSet::new()).await.unwrap();
assert_eq!(
dag.compare(&qualm_node_id, &quake_node_id).unwrap(),
dag.compare(&qualm_node_id, &quake_node_id).await.unwrap(),
NodeCompare::Uncomparable
);
assert_eq!(
dag.compare(&quell_node_id, &quake_node_id).unwrap(),
dag.compare(&quell_node_id, &quake_node_id).await.unwrap(),
NodeCompare::Uncomparable
);
assert_eq!(
dag.compare(&quell_node_id, &qualm_node_id).unwrap(),
dag.compare(&quell_node_id, &qualm_node_id).await.unwrap(),
NodeCompare::Uncomparable
);
}
#[test]
fn test_find_next_missing_nodes_disjoint_graphs_no_deps() {
#[async_std::test]
async fn test_find_next_missing_nodes_disjoint_graphs_no_deps() {
let mut dag1 = TestDag::new(BTreeMap::new());
let mut dag2 = TestDag::new(BTreeMap::new());
let quake_node_id = dag1.add_node("quake", BTreeSet::new()).unwrap();
let qualm_node_id = dag1.add_node("qualm", BTreeSet::new()).unwrap();
dag2.add_node("quell", BTreeSet::new()).unwrap();
let quake_node_id = dag1.add_node("quake", BTreeSet::new()).await.unwrap();
let qualm_node_id = dag1.add_node("qualm", BTreeSet::new()).await.unwrap();
dag2.add_node("quell", BTreeSet::new()).await.unwrap();
let missing_nodes = dag1
.find_next_non_descendant_nodes(dag2.get_roots())
.find_next_non_descendant_nodes(dag2.get_roots()).await
.unwrap();
assert_eq!(missing_nodes.len(), 2);
let mut found_quake = false;
@ -198,19 +198,19 @@ fn test_find_next_missing_nodes_disjoint_graphs_no_deps() {
assert!(found_qualm);
}
#[test]
fn test_find_next_missing_nodes_sub_graphs_one_degree_off() {
#[async_std::test]
async fn test_find_next_missing_nodes_sub_graphs_one_degree_off() {
let mut dag1 = TestDag::new(BTreeMap::new());
let mut dag2 = TestDag::new(BTreeMap::new());
dag1.add_node("quake", BTreeSet::new()).unwrap();
let quake_node_id = dag2.add_node("quake", BTreeSet::new()).unwrap();
dag1.add_node("quake", BTreeSet::new()).await.unwrap();
let quake_node_id = dag2.add_node("quake", BTreeSet::new()).await.unwrap();
let mut deps = BTreeSet::new();
deps.insert(quake_node_id);
let qualm_node_id = dag1.add_node("qualm", deps).unwrap();
let qualm_node_id = dag1.add_node("qualm", deps).await.unwrap();
let missing_nodes = dag1
.find_next_non_descendant_nodes(dag2.get_roots())
.find_next_non_descendant_nodes(dag2.get_roots()).await
.unwrap();
assert_eq!(missing_nodes.len(), 1);
let mut found_qualm = false;
@ -222,24 +222,24 @@ fn test_find_next_missing_nodes_sub_graphs_one_degree_off() {
assert!(found_qualm);
}
#[test]
fn test_find_next_missing_nodes_sub_graphs_two_degree_off() {
#[async_std::test]
async fn test_find_next_missing_nodes_sub_graphs_two_degree_off() {
let mut dag1 = TestDag::new(BTreeMap::new());
let mut dag2 = TestDag::new(BTreeMap::new());
dag1.add_node("quake", BTreeSet::new()).unwrap();
let quake_node_id = dag2.add_node("quake", BTreeSet::new()).unwrap();
dag1.add_node("quake", BTreeSet::new()).await.unwrap();
let quake_node_id = dag2.add_node("quake", BTreeSet::new()).await.unwrap();
let mut deps = BTreeSet::new();
deps.insert(quake_node_id.clone());
let qualm_node_id = dag1.add_node("qualm", deps).unwrap();
let qualm_node_id = dag1.add_node("qualm", deps).await.unwrap();
deps = BTreeSet::new();
deps.insert(quake_node_id.clone());
deps.insert(qualm_node_id.clone());
let quell_node_id = dag1.add_node("quell", deps).unwrap();
let quell_node_id = dag1.add_node("quell", deps).await.unwrap();
let missing_nodes = dag1
.find_next_non_descendant_nodes(dag2.get_roots())
.find_next_non_descendant_nodes(dag2.get_roots()).await
.unwrap();
assert_eq!(missing_nodes.len(), 2);
let mut found_qualm = false;
@ -264,20 +264,20 @@ mod cbor_serialization_tests {
use std::collections::BTreeMap;
use std::collections::{hash_map::DefaultHasher, BTreeSet};
#[test]
fn test_node_deserializaton() {
#[async_std::test]
async fn test_node_deserializaton() {
let mut dag = TestDag::new(BTreeMap::new());
let simple_node_id = dag.add_node("simple", BTreeSet::new()).unwrap();
let simple_node_id = dag.add_node("simple", BTreeSet::new()).await.unwrap();
let mut dep_set = BTreeSet::new();
dep_set.insert(simple_node_id.clone());
let root_node_id = dag.add_node("root", dep_set).unwrap();
let root_node_id = dag.add_node("root", dep_set).await.unwrap();
let simple_node_to_serialize = dag
.get_node_by_id(simple_node_id.as_slice())
.get_node_by_id(simple_node_id.as_slice()).await
.unwrap()
.unwrap();
let root_node_to_serialize = dag
.get_node_by_id(root_node_id.as_slice())
.get_node_by_id(root_node_id.as_slice()).await
.unwrap()
.unwrap();