From dcfa8bd313d336203743ed667562359e469b3aa9 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Thu, 3 Jul 2025 16:58:56 -0500 Subject: [PATCH] wip: evolving the storage layer --- Cargo.lock | 37 +- offline-web-storage/Cargo.toml | 3 +- offline-web-storage/src/integration_tests.rs | 175 +++++ offline-web-storage/src/lib.rs | 675 +++++++----------- .../tests/integration_tests.rs | 430 ----------- 5 files changed, 467 insertions(+), 853 deletions(-) create mode 100644 offline-web-storage/src/integration_tests.rs delete mode 100644 offline-web-storage/tests/integration_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 2430caf..d13c163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -468,7 +468,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1154,6 +1154,7 @@ dependencies = [ "serde", "serde_json", "sqlx", + "thiserror 2.0.12", "tokio", "uuid", ] @@ -1420,7 +1421,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1684,7 +1685,7 @@ dependencies = [ "sha2", "smallvec", "sqlformat", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-stream", "tracing", @@ -1770,7 +1771,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 1.0.69", "tracing", "uuid", "whoami", @@ -1810,7 +1811,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 1.0.69", "tracing", "uuid", "whoami", @@ -1913,7 +1914,7 @@ dependencies = [ "getrandom 0.3.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1922,7 +1923,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -1936,6 +1946,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "tinystr" version = "0.8.1" @@ -2087,7 +2108,7 @@ dependencies = [ "log", "rand 0.8.5", "sha1", - "thiserror", + "thiserror 1.0.69", "utf-8", ] diff --git a/offline-web-storage/Cargo.toml b/offline-web-storage/Cargo.toml index 832f400..f46aa93 100644 --- a/offline-web-storage/Cargo.toml +++ b/offline-web-storage/Cargo.toml @@ -13,5 +13,6 @@ anyhow = "1.0" uuid = { version = "1.0", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } blake2 = "0.10" +thiserror = "2.0.12" -[dev-dependencies] \ No newline at end of file +[dev-dependencies] diff --git a/offline-web-storage/src/integration_tests.rs b/offline-web-storage/src/integration_tests.rs new file mode 100644 index 0000000..dce679c --- /dev/null +++ b/offline-web-storage/src/integration_tests.rs @@ -0,0 +1,175 @@ +use std::sync::Arc; + +use offline_web_model::Reference; +use super::{ReferenceStore, SqliteReferenceStore, StoreError}; + +async fn create_test_store() -> SqliteReferenceStore { + SqliteReferenceStore::new("sqlite::memory:").await.unwrap() +} + +#[tokio::test] +async fn test_store_and_retrieve_reference() { + let store = create_test_store().await; + + // Create a test reference + let reference = Reference::new( + Some("test_content_address".to_string()), + "test_reference".to_string(), + ); + + // Store the reference + store.store_reference(&reference).await.unwrap(); + + // Retrieve the reference by ID + let retrieved = store.get_reference(&reference.id).await.unwrap(); + + // Verify the retrieved reference matches the original + assert_eq!(retrieved.id, reference.id); + assert_eq!(retrieved.content_address, reference.content_address); + assert_eq!(retrieved.name, reference.name); + assert_eq!(retrieved.dependents.len(), reference.dependents.len()); +} + +#[tokio::test] +async fn test_store_and_retrieve_content() { + let store = create_test_store().await; + + let content = b"Hello, World!"; + let content_address = "test_content_address"; + + // Store content + store.store_content(content_address, content).await.unwrap(); + + // Create a reference pointing to this content + let reference = Reference::new( + Some(content_address.to_string()), + "test_reference".to_string(), + ); + + // Retrieve content using the reference + let retrieved_content = store.get_content_for_reference(reference).await.unwrap(); + + // Verify the content matches + assert_eq!(retrieved_content, String::from_utf8(content.to_vec()).unwrap()); +} + +#[tokio::test] +async fn test_reference_with_dependents() { + let store = create_test_store().await; + + // Create a leaf reference (no dependents) + let leaf_ref = Reference::new( + Some("leaf_content_address".to_string()), + "leaf_reference".to_string(), + ); + + // Store the leaf reference + store.store_reference(&leaf_ref).await.unwrap(); + + // Create a parent reference that depends on the leaf + let parent_ref = Reference::new( + Some("parent_content_address".to_string()), + "parent_reference".to_string(), + ).add_dep(Arc::new(leaf_ref.clone())); + + // Store the parent reference + store.store_reference(&parent_ref).await.unwrap(); + + // Retrieve the parent reference + let retrieved_parent = store.get_reference(&parent_ref.id).await.unwrap(); + + // Verify the parent has the correct dependent + assert_eq!(retrieved_parent.dependents.len(), 1); + assert_eq!(retrieved_parent.dependents[0].name, leaf_ref.name); +} + +#[tokio::test] +async fn test_get_graph() { + let store = create_test_store().await; + + // Create a hierarchy of references + let leaf1 = Reference::new( + Some("leaf1_content".to_string()), + "leaf1".to_string(), + ); + + let leaf2 = Reference::new( + Some("leaf2_content".to_string()), + "leaf2".to_string(), + ); + + let parent = Reference::new( + Some("parent_content".to_string()), + "parent".to_string(), + ) + .add_dep(Arc::new(leaf1.clone())) + .add_dep(Arc::new(leaf2.clone())); + + let root = Reference::new( + Some("root_content".to_string()), + "root".to_string(), + ).add_dep(Arc::new(parent.clone())); + + // Store all references + store.store_reference(&leaf1).await.unwrap(); + store.store_reference(&leaf2).await.unwrap(); + store.store_reference(&parent).await.unwrap(); + store.store_reference(&root).await.unwrap(); + + // Get the graph starting from root + let graph = store.get_graph("root").await.unwrap(); + + // Verify we got all references in the graph + assert_eq!(graph.len(), 4); + + // Verify we have all the expected references + let names: Vec<_> = graph.iter().map(|r| &r.name).collect(); + assert!(names.contains(&&"root".to_string())); + assert!(names.contains(&&"parent".to_string())); + assert!(names.contains(&&"leaf1".to_string())); + assert!(names.contains(&&"leaf2".to_string())); +} + +#[tokio::test] +async fn test_nonexistent_reference() { + let store = create_test_store().await; + + // Try to retrieve a reference that doesn't exist + let result = store.get_reference("nonexistent_id").await; + + // Should return NoSuchReference error + assert!(matches!(result, Err(StoreError::NoSuchReference))); +} + +#[tokio::test] +async fn test_nonexistent_content() { + let store = create_test_store().await; + + // Create a reference with a content address that doesn't exist + let reference = Reference::new( + Some("nonexistent_content_address".to_string()), + "test_reference".to_string(), + ); + + // Try to retrieve content + let result = store.get_content_for_reference(reference).await; + + // Should return NoSuchContentAddress error + assert!(matches!(result, Err(StoreError::NoSuchContentAddress))); +} + +#[tokio::test] +async fn test_reference_without_content_address() { + let store = create_test_store().await; + + // Create a reference without a content address + let reference = Reference::new(None, "test_reference".to_string()); + + // Try to retrieve content + let result = store.get_content_for_reference(reference).await; + + // Should return NoSuchContentAddress error + assert!(matches!(result, Err(StoreError::NoSuchContentAddress))); +} + + diff --git a/offline-web-storage/src/lib.rs b/offline-web-storage/src/lib.rs index caf4a5b..fade3d4 100644 --- a/offline-web-storage/src/lib.rs +++ b/offline-web-storage/src/lib.rs @@ -1,514 +1,361 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; -use anyhow::Result; -use blake2::{Blake2b512, Digest}; +use thiserror::Error; use offline_web_model::Reference; use sqlx::{Pool, Row, Sqlite, SqlitePool}; -pub struct ReferenceStore { +pub struct SqliteReferenceStore { pool: Pool, } -impl ReferenceStore { - pub async fn new(database_url: &str) -> Result { - let pool = SqlitePool::connect(database_url).await?; - let store = Self { pool }; - store.initialize_schema().await?; - Ok(store) - } +#[derive(Error, Debug)] +pub enum StoreError { + #[error("No such reference")] + NoSuchReference, + #[error("No such content address")] + NoSuchContentAddress, + #[error("Unknown Storage Error: {0:?}")] + StorageError(Box), +} - async fn initialize_schema(&self) -> Result<()> { +#[allow(async_fn_in_trait)] +pub trait ReferenceStore { + async fn get_reference(&self, id: &str) -> Result; + + async fn get_content_for_reference(&self, reference: Reference) -> Result; + + async fn get_graph(&self, root_name: &str) -> Result, StoreError>; +} + +impl SqliteReferenceStore { + pub async fn new(database_url: &str) -> Result { + let pool = SqlitePool::connect(database_url) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Create tables if they don't exist sqlx::query( r#" - CREATE TABLE IF NOT EXISTS refs ( + CREATE TABLE IF NOT EXISTS ref_entries ( id TEXT PRIMARY KEY, content_address TEXT, - name TEXT NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + name TEXT NOT NULL UNIQUE ) "#, ) - .execute(&self.pool) - .await?; + .execute(&pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; sqlx::query( r#" - CREATE TABLE IF NOT EXISTS reference_dependencies ( + CREATE TABLE IF NOT EXISTS ref_dependencies ( parent_id TEXT NOT NULL, dependent_id TEXT NOT NULL, - PRIMARY KEY (parent_id, dependent_id) + PRIMARY KEY (parent_id, dependent_id), + FOREIGN KEY (parent_id) REFERENCES ref_entries(id), + FOREIGN KEY (dependent_id) REFERENCES ref_entries(id) ) "#, ) - .execute(&self.pool) - .await?; - - sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_name ON refs(name)") - .execute(&self.pool) - .await?; - - sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_content_address ON refs(content_address)") - .execute(&self.pool) - .await?; - - sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_parent ON reference_dependencies(parent_id)") - .execute(&self.pool) - .await?; - - sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_dependent ON reference_dependencies(dependent_id)") - .execute(&self.pool) - .await?; + .execute(&pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; sqlx::query( r#" - CREATE TABLE IF NOT EXISTS content_objects ( + CREATE TABLE IF NOT EXISTS content_store ( content_address TEXT PRIMARY KEY, - content_data BLOB NOT NULL, - content_type TEXT, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP + content BLOB NOT NULL ) "#, ) - .execute(&self.pool) - .await?; + .execute(&pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; - sqlx::query( - "CREATE INDEX IF NOT EXISTS idx_content_created ON content_objects(created_at)", - ) - .execute(&self.pool) - .await?; - - Ok(()) + Ok(Self { pool }) } - pub async fn store_reference(&self, reference: &Reference) -> Result<()> { - let mut tx = self.pool.begin().await?; + pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> { + // Use a transaction to ensure atomicity + let mut tx = self.pool.begin().await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Insert or update the reference sqlx::query( r#" - INSERT OR REPLACE INTO refs (id, content_address, name, updated_at) - VALUES (?, ?, ?, CURRENT_TIMESTAMP) + INSERT OR REPLACE INTO ref_entries (id, content_address, name) + VALUES (?, ?, ?) "#, ) .bind(&reference.id) .bind(&reference.content_address) .bind(&reference.name) .execute(&mut *tx) - .await?; + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; - // Clear existing dependencies for this reference - sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?") - .bind(&reference.id) - .execute(&mut *tx) - .await?; + // Delete existing dependencies for this reference + sqlx::query( + r#" + DELETE FROM ref_dependencies + WHERE parent_id = ? + "#, + ) + .bind(&reference.id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Insert new dependencies for dependent in &reference.dependents { sqlx::query( - "INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)", + r#" + INSERT INTO ref_dependencies (parent_id, dependent_id) + VALUES (?, ?) + "#, ) .bind(&reference.id) .bind(&dependent.id) .execute(&mut *tx) - .await?; + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; } - tx.commit().await?; + // Commit the transaction + tx.commit().await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + Ok(()) } - pub async fn get_reference(&self, id: &str) -> Result> { - // Get the reference record - let row = sqlx::query("SELECT id, content_address, name FROM refs WHERE id = ?") - .bind(id) - .fetch_optional(&self.pool) - .await?; + pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> { + sqlx::query( + r#" + INSERT OR REPLACE INTO content_store (content_address, content) + VALUES (?, ?) + "#, + ) + .bind(content_address) + .bind(content) + .execute(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; - let Some(row) = row else { - return Ok(None); - }; + Ok(()) + } +} - let reference_id: String = row.get("id"); - let content_address: Option = row.get("content_address"); - let name: String = row.get("name"); +impl ReferenceStore for SqliteReferenceStore { + async fn get_reference(&self, id: &str) -> Result { + // First, get the basic reference information + let row = sqlx::query( + r#" + SELECT id, content_address, name + FROM ref_entries + WHERE id = ? + "#, + ) + .bind(id) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; - // Get dependencies - let dependents = self.get_dependents(&reference_id).await?; + match row { + Some(row) => { + let id: String = row.get("id"); + let content_address: Option = row.get("content_address"); + let name: String = row.get("name"); - Ok(Some(Reference { - id: reference_id, - content_address, - name, - dependents, - })) + // Get the dependents by recursively fetching them + let dependents = self.get_dependents(&id).await?; + + Ok(Reference { + id, + content_address, + name, + dependents, + }) + } + None => Err(StoreError::NoSuchReference), + } } - async fn get_dependents(&self, parent_id: &str) -> Result>> { - // Get all dependents at once using a recursive CTE (Common Table Expression) + async fn get_content_for_reference(&self, reference: Reference) -> Result { + if let Some(content_address) = &reference.content_address { + let row = sqlx::query( + r#" + SELECT content + FROM content_store + WHERE content_address = ? + "#, + ) + .bind(content_address) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + match row { + Some(row) => { + let content: Vec = row.get("content"); + String::from_utf8(content) + .map_err(|e| StoreError::StorageError(Box::new(e))) + } + None => Err(StoreError::NoSuchContentAddress), + } + } else { + Err(StoreError::NoSuchContentAddress) + } + } + + async fn get_graph(&self, root_name: &str) -> Result, StoreError> { + let mut visited = std::collections::HashSet::new(); + let mut result = Vec::new(); + let mut queue = std::collections::VecDeque::new(); + + // Start with the root name + queue.push_back(root_name.to_string()); + + while let Some(current_name) = queue.pop_front() { + if visited.contains(¤t_name) { + continue; + } + + visited.insert(current_name.clone()); + + // Get the reference by name + let row = sqlx::query( + r#" + SELECT id, content_address, name + FROM ref_entries + WHERE name = ? + "#, + ) + .bind(¤t_name) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + if let Some(row) = row { + let id: String = row.get("id"); + let content_address: Option = row.get("content_address"); + let name: String = row.get("name"); + + // Get dependents for this reference + let dependents = self.get_dependents(&id).await?; + + let reference = Reference { + id, + content_address, + name, + dependents: dependents.clone(), + }; + + result.push(reference); + + // Add all dependent names to the queue for processing + for dependent in dependents { + if !visited.contains(&dependent.name) { + queue.push_back(dependent.name.clone()); + } + } + } + } + + Ok(result) + } +} + +impl SqliteReferenceStore { + async fn get_dependents(&self, parent_id: &str) -> Result>, StoreError> { + // Use a CTE (Common Table Expression) to get the entire dependency tree in one query let rows = sqlx::query( r#" - WITH RECURSIVE dependent_tree AS ( - -- Base case: direct dependents - SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as level - FROM refs r - JOIN reference_dependencies rd ON r.id = rd.dependent_id + WITH RECURSIVE dependency_tree AS ( + -- Base case: direct dependents of the parent + SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth + FROM ref_entries r + JOIN ref_dependencies rd ON r.id = rd.dependent_id WHERE rd.parent_id = ? UNION ALL -- Recursive case: dependents of dependents - SELECT r.id, r.content_address, r.name, rd.parent_id, dt.level + 1 - FROM refs r - JOIN reference_dependencies rd ON r.id = rd.dependent_id - JOIN dependent_tree dt ON rd.parent_id = dt.id + SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1 + FROM ref_entries r + JOIN ref_dependencies rd ON r.id = rd.dependent_id + JOIN dependency_tree dt ON rd.parent_id = dt.id + WHERE dt.depth < 100 -- Prevent infinite recursion ) - SELECT id, content_address, name, parent_id, level - FROM dependent_tree - ORDER BY level, name + SELECT id, content_address, name, parent_id, depth + FROM dependency_tree + ORDER BY depth, name "#, ) .bind(parent_id) .fetch_all(&self.pool) - .await?; + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; - // Build the tree structure from the flattened results - let mut refs_map: std::collections::HashMap> = std::collections::HashMap::new(); + // Build the dependency tree iteratively + let mut reference_map: std::collections::HashMap = std::collections::HashMap::new(); let mut children_map: std::collections::HashMap> = std::collections::HashMap::new(); - // First pass: create all references and track relationships + // First pass: create all references and build the children map for row in &rows { let id: String = row.get("id"); let content_address: Option = row.get("content_address"); let name: String = row.get("name"); let parent_id: String = row.get("parent_id"); - // Create reference without dependents first - let reference = Arc::new(Reference { + let reference = Reference { id: id.clone(), content_address, name, - dependents: Vec::new(), - }); + dependents: Vec::new(), // Will be filled in second pass + }; - refs_map.insert(id.clone(), reference); + reference_map.insert(id.clone(), reference); children_map.entry(parent_id).or_default().push(id); } - // Second pass: build the tree by adding dependents to each reference - fn build_dependents( - ref_id: &str, - refs_map: &std::collections::HashMap>, - children_map: &std::collections::HashMap>, - ) -> Vec> { - if let Some(child_ids) = children_map.get(ref_id) { - let mut dependents = Vec::new(); - for child_id in child_ids { - if let Some(child_ref) = refs_map.get(child_id) { - let nested_dependents = build_dependents(child_id, refs_map, children_map); - let updated_child = Arc::new(Reference { - id: child_ref.id.clone(), - content_address: child_ref.content_address.clone(), - name: child_ref.name.clone(), - dependents: nested_dependents, - }); - dependents.push(updated_child); + // Second pass: build the dependency tree from bottom up (highest depth first) + let mut depth_groups: std::collections::BTreeMap> = std::collections::BTreeMap::new(); + for row in &rows { + let id: String = row.get("id"); + let depth: i32 = row.get("depth"); + depth_groups.entry(depth).or_default().push(id); + } + + // Process from highest depth to lowest (leaves to roots) + for (_depth, ids) in depth_groups.iter().rev() { + for id in ids { + if let Some(children) = children_map.get(id).cloned() { + let child_references: Vec> = children + .iter() + .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) + .collect(); + + if let Some(reference) = reference_map.get_mut(id) { + reference.dependents = child_references; } } - dependents.sort_by(|a, b| a.name.cmp(&b.name)); - dependents - } else { - Vec::new() } } - Ok(build_dependents(parent_id, &refs_map, &children_map)) + // Return the direct children of the parent + let empty_vec = Vec::new(); + let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec); + let result = direct_children + .iter() + .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) + .collect(); + + Ok(result) } - pub async fn get_references_by_name(&self, name: &str) -> Result> { - let rows = sqlx::query("SELECT id FROM refs WHERE name = ?") - .bind(name) - .fetch_all(&self.pool) - .await?; - - let mut references = Vec::new(); - for row in rows { - let id: String = row.get("id"); - if let Some(reference) = self.get_reference(&id).await? { - references.push(reference); - } - } - - Ok(references) - } - - pub async fn get_references_by_content_address( - &self, - content_address: &str, - ) -> Result> { - let rows = sqlx::query("SELECT id FROM refs WHERE content_address = ?") - .bind(content_address) - .fetch_all(&self.pool) - .await?; - - let mut references = Vec::new(); - for row in rows { - let id: String = row.get("id"); - if let Some(reference) = self.get_reference(&id).await? { - references.push(reference); - } - } - - Ok(references) - } - - pub async fn delete_reference(&self, id: &str) -> Result { - let result = sqlx::query("DELETE FROM refs WHERE id = ?") - .bind(id) - .execute(&self.pool) - .await?; - - Ok(result.rows_affected() > 0) - } - - pub async fn list_all_references(&self) -> Result> { - let rows = sqlx::query("SELECT id FROM refs ORDER BY name") - .fetch_all(&self.pool) - .await?; - - let mut references = Vec::new(); - for row in rows { - let id: String = row.get("id"); - if let Some(reference) = self.get_reference(&id).await? { - references.push(reference); - } - } - - Ok(references) - } - - pub async fn update_reference_graph( - &self, - updated_references: &HashMap>, - ) -> Result<()> { - let mut tx = self.pool.begin().await?; - - for reference in updated_references.values() { - // Update the reference - sqlx::query( - r#" - INSERT OR REPLACE INTO refs (id, content_address, name, updated_at) - VALUES (?, ?, ?, CURRENT_TIMESTAMP) - "#, - ) - .bind(&reference.id) - .bind(&reference.content_address) - .bind(&reference.name) - .execute(&mut *tx) - .await?; - - // Clear existing dependencies - sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?") - .bind(&reference.id) - .execute(&mut *tx) - .await?; - - // Insert new dependencies - for dependent in &reference.dependents { - sqlx::query( - "INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)", - ) - .bind(&reference.id) - .bind(&dependent.id) - .execute(&mut *tx) - .await?; - } - } - - tx.commit().await?; - Ok(()) - } - - pub fn calculate_content_address(content: &[u8]) -> String { - let mut hasher = Blake2b512::new(); - hasher.update(content); - format!("{:x}", hasher.finalize()) - } - - pub async fn store_content( - &self, - content: &[u8], - content_type: Option, - ) -> Result { - let content_address = Self::calculate_content_address(content); - - // Check if content already exists (deduplication) - let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?") - .bind(&content_address) - .fetch_optional(&self.pool) - .await? - .is_some(); - - if !exists { - sqlx::query( - r#" - INSERT INTO content_objects (content_address, content_data, content_type) - VALUES (?, ?, ?) - "#, - ) - .bind(&content_address) - .bind(content) - .bind(&content_type) - .execute(&self.pool) - .await?; - } - - Ok(content_address) - } - - pub async fn get_content(&self, content_address: &str) -> Result>> { - let row = sqlx::query("SELECT content_data FROM content_objects WHERE content_address = ?") - .bind(content_address) - .fetch_optional(&self.pool) - .await?; - - Ok(row.map(|row| row.get::, _>("content_data"))) - } - - pub async fn get_content_info(&self, content_address: &str) -> Result> { - let row = sqlx::query( - "SELECT content_type, created_at FROM content_objects WHERE content_address = ?", - ) - .bind(content_address) - .fetch_optional(&self.pool) - .await?; - - Ok(row.map(|row| ContentInfo { - content_address: content_address.to_string(), - content_type: row.get("content_type"), - size: 0, // Size not stored in database - created_at: row.get("created_at"), - })) - } - - pub async fn content_exists(&self, content_address: &str) -> Result { - let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?") - .bind(content_address) - .fetch_optional(&self.pool) - .await? - .is_some(); - - Ok(exists) - } - - pub async fn store_reference_with_content( - &self, - name: String, - content: &[u8], - content_type: Option, - ) -> Result { - // Store the content and get its address - let content_address = self.store_content(content, content_type).await?; - - // Create the reference - let reference = Reference::new(Some(content_address), name); - - // Store the reference - self.store_reference(&reference).await?; - - Ok(reference) - } - - pub async fn get_reference_with_content( - &self, - id: &str, - ) -> Result>)>> { - let reference = self.get_reference(id).await?; - - if let Some(ref reference) = reference { - if let Some(ref content_address) = reference.content_address { - let content = self.get_content(content_address).await?; - return Ok(Some((reference.clone(), content))); - } - } - - Ok(reference.map(|r| (r, None))) - } - - pub async fn delete_content(&self, content_address: &str) -> Result { - let result = sqlx::query("DELETE FROM content_objects WHERE content_address = ?") - .bind(content_address) - .execute(&self.pool) - .await?; - - Ok(result.rows_affected() > 0) - } - - pub async fn list_unreferenced_content(&self) -> Result> { - let rows = sqlx::query( - r#" - SELECT co.content_address - FROM content_objects co - LEFT JOIN refs r ON co.content_address = r.content_address - WHERE r.content_address IS NULL - "#, - ) - .fetch_all(&self.pool) - .await?; - - Ok(rows - .into_iter() - .map(|row| row.get("content_address")) - .collect()) - } - - pub async fn cleanup_unreferenced_content(&self) -> Result { - let result = sqlx::query( - r#" - DELETE FROM content_objects - WHERE content_address IN ( - SELECT co.content_address - FROM content_objects co - LEFT JOIN refs r ON co.content_address = r.content_address - WHERE r.content_address IS NULL - ) - "#, - ) - .execute(&self.pool) - .await?; - - Ok(result.rows_affected() as usize) - } - - pub async fn get_storage_stats(&self) -> Result { - let content_count = sqlx::query("SELECT COUNT(*) as count FROM content_objects") - .fetch_one(&self.pool) - .await?; - - let reference_count = sqlx::query("SELECT COUNT(*) as count FROM refs") - .fetch_one(&self.pool) - .await?; - - Ok(StorageStats { - content_object_count: content_count.get::("count") as usize, - total_content_size: 0, // Size not tracked - reference_count: reference_count.get::("count") as usize, - }) - } } -#[derive(Debug, Clone)] -pub struct ContentInfo { - pub content_address: String, - pub content_type: Option, - pub size: usize, - pub created_at: String, -} - -#[derive(Debug, Clone)] -pub struct StorageStats { - pub content_object_count: usize, - pub total_content_size: usize, - pub reference_count: usize, -} +#[cfg(test)] +mod integration_tests; diff --git a/offline-web-storage/tests/integration_tests.rs b/offline-web-storage/tests/integration_tests.rs deleted file mode 100644 index fc04457..0000000 --- a/offline-web-storage/tests/integration_tests.rs +++ /dev/null @@ -1,430 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use offline_web_model::Reference; -use offline_web_storage::ReferenceStore; - -async fn create_test_store() -> ReferenceStore { - ReferenceStore::new("sqlite::memory:").await.unwrap() -} - -#[tokio::test] -async fn test_store_and_retrieve_reference() { - let store = create_test_store().await; - - let reference = Reference::new(Some("abc123".to_string()), "test.txt".to_string()); - - // Store the reference - store.store_reference(&reference).await.unwrap(); - - // Retrieve it - let retrieved = store.get_reference(&reference.id).await.unwrap(); - assert!(retrieved.is_some()); - - let retrieved = retrieved.unwrap(); - assert_eq!(retrieved.id, reference.id); - assert_eq!(retrieved.content_address, reference.content_address); - assert_eq!(retrieved.name, reference.name); - assert_eq!(retrieved.dependents.len(), 0); -} - -#[tokio::test] -async fn test_store_reference_with_dependents() { - let store = create_test_store().await; - - let dep1 = Arc::new(Reference::new( - Some("dep1".to_string()), - "dep1.txt".to_string(), - )); - let dep2 = Arc::new(Reference::new( - Some("dep2".to_string()), - "dep2.txt".to_string(), - )); - - // Store dependencies first - store.store_reference(&dep1).await.unwrap(); - store.store_reference(&dep2).await.unwrap(); - - let mut parent = Reference::new(Some("parent".to_string()), "parent.txt".to_string()); - parent = parent.add_dep(dep1.clone()); - parent = parent.add_dep(dep2.clone()); - - // Store parent with dependencies - store.store_reference(&parent).await.unwrap(); - - // Retrieve and verify - let retrieved = store.get_reference(&parent.id).await.unwrap().unwrap(); - assert_eq!(retrieved.dependents.len(), 2); - - let dep_names: Vec<_> = retrieved.dependents.iter().map(|d| &d.name).collect(); - assert!(dep_names.contains(&&"dep1.txt".to_string())); - assert!(dep_names.contains(&&"dep2.txt".to_string())); -} - -#[tokio::test] -async fn test_get_references_by_name() { - let store = create_test_store().await; - - let ref1 = Reference::new(Some("abc1".to_string()), "test.txt".to_string()); - let ref2 = Reference::new(Some("abc2".to_string()), "test.txt".to_string()); - let ref3 = Reference::new(Some("abc3".to_string()), "other.txt".to_string()); - - store.store_reference(&ref1).await.unwrap(); - store.store_reference(&ref2).await.unwrap(); - store.store_reference(&ref3).await.unwrap(); - - let results = store.get_references_by_name("test.txt").await.unwrap(); - assert_eq!(results.len(), 2); - - let results = store.get_references_by_name("other.txt").await.unwrap(); - assert_eq!(results.len(), 1); - - let results = store - .get_references_by_name("nonexistent.txt") - .await - .unwrap(); - assert_eq!(results.len(), 0); -} - -#[tokio::test] -async fn test_get_references_by_content_address() { - let store = create_test_store().await; - - let ref1 = Reference::new(Some("same_content".to_string()), "file1.txt".to_string()); - let ref2 = Reference::new(Some("same_content".to_string()), "file2.txt".to_string()); - let ref3 = Reference::new( - Some("different_content".to_string()), - "file3.txt".to_string(), - ); - - store.store_reference(&ref1).await.unwrap(); - store.store_reference(&ref2).await.unwrap(); - store.store_reference(&ref3).await.unwrap(); - - let results = store - .get_references_by_content_address("same_content") - .await - .unwrap(); - assert_eq!(results.len(), 2); - - let results = store - .get_references_by_content_address("different_content") - .await - .unwrap(); - assert_eq!(results.len(), 1); -} - -#[tokio::test] -async fn test_delete_reference() { - let store = create_test_store().await; - - let reference = Reference::new(Some("abc123".to_string()), "test.txt".to_string()); - store.store_reference(&reference).await.unwrap(); - - // Verify it exists - let retrieved = store.get_reference(&reference.id).await.unwrap(); - assert!(retrieved.is_some()); - - // Delete it - let deleted = store.delete_reference(&reference.id).await.unwrap(); - assert!(deleted); - - // Verify it's gone - let retrieved = store.get_reference(&reference.id).await.unwrap(); - assert!(retrieved.is_none()); - - // Try to delete again - let deleted = store.delete_reference(&reference.id).await.unwrap(); - assert!(!deleted); -} - -#[tokio::test] -async fn test_list_all_references() { - let store = create_test_store().await; - - let ref1 = Reference::new(Some("abc1".to_string()), "a.txt".to_string()); - let ref2 = Reference::new(Some("abc2".to_string()), "b.txt".to_string()); - let ref3 = Reference::new(Some("abc3".to_string()), "c.txt".to_string()); - - store.store_reference(&ref1).await.unwrap(); - store.store_reference(&ref2).await.unwrap(); - store.store_reference(&ref3).await.unwrap(); - - let all_refs = store.list_all_references().await.unwrap(); - assert_eq!(all_refs.len(), 3); - - // Should be sorted by name - assert_eq!(all_refs[0].name, "a.txt"); - assert_eq!(all_refs[1].name, "b.txt"); - assert_eq!(all_refs[2].name, "c.txt"); -} - -#[tokio::test] -async fn test_update_reference_graph() { - let store = create_test_store().await; - - let ref1 = Arc::new(Reference::new( - Some("abc1".to_string()), - "file1.txt".to_string(), - )); - let ref2 = Arc::new(Reference::new( - Some("abc2".to_string()), - "file2.txt".to_string(), - )); - - let mut updated_refs = HashMap::new(); - updated_refs.insert(ref1.id.clone(), ref1.clone()); - updated_refs.insert(ref2.id.clone(), ref2.clone()); - - store.update_reference_graph(&updated_refs).await.unwrap(); - - // Verify both references were stored - let retrieved1 = store.get_reference(&ref1.id).await.unwrap(); - let retrieved2 = store.get_reference(&ref2.id).await.unwrap(); - - assert!(retrieved1.is_some()); - assert!(retrieved2.is_some()); - assert_eq!(retrieved1.unwrap().name, "file1.txt"); - assert_eq!(retrieved2.unwrap().name, "file2.txt"); -} - -#[tokio::test] -async fn test_store_and_retrieve_content() { - let store = create_test_store().await; - - let content = b"Hello, world!"; - let content_type = Some("text/plain".to_string()); - - // Store content - let content_address = store - .store_content(content, content_type.clone()) - .await - .unwrap(); - - // Retrieve content - let retrieved_content = store.get_content(&content_address).await.unwrap(); - assert!(retrieved_content.is_some()); - assert_eq!(retrieved_content.unwrap(), content); - - // Check content info - let content_info = store.get_content_info(&content_address).await.unwrap(); - assert!(content_info.is_some()); - let info = content_info.unwrap(); - assert_eq!(info.content_address, content_address); - assert_eq!(info.content_type, content_type); - - // Check content exists - assert!(store.content_exists(&content_address).await.unwrap()); -} - -#[tokio::test] -async fn test_content_deduplication() { - let store = create_test_store().await; - - let content = b"Duplicate content"; - - // Store same content twice - let addr1 = store.store_content(content, None).await.unwrap(); - let addr2 = store.store_content(content, None).await.unwrap(); - - // Should get same address - assert_eq!(addr1, addr2); - - // Should only have one copy in storage - let stats = store.get_storage_stats().await.unwrap(); - assert_eq!(stats.content_object_count, 1); -} - -#[tokio::test] -async fn test_store_reference_with_content() { - let store = create_test_store().await; - - let content = b"File content here"; - let name = "test_file.txt".to_string(); - let content_type = Some("text/plain".to_string()); - - // Store reference with content - let reference = store - .store_reference_with_content(name.clone(), content, content_type) - .await - .unwrap(); - - assert_eq!(reference.name, name); - assert!(reference.content_address.is_some()); - - // Retrieve reference with content - let (retrieved_ref, retrieved_content) = store - .get_reference_with_content(&reference.id) - .await - .unwrap() - .unwrap(); - - assert_eq!(retrieved_ref.id, reference.id); - assert_eq!(retrieved_ref.name, name); - assert!(retrieved_content.is_some()); - assert_eq!(retrieved_content.unwrap(), content); -} - -#[tokio::test] -async fn test_calculate_content_address() { - let content1 = b"Same content"; - let content2 = b"Same content"; - let content3 = b"Different content"; - - let addr1 = ReferenceStore::calculate_content_address(content1); - let addr2 = ReferenceStore::calculate_content_address(content2); - let addr3 = ReferenceStore::calculate_content_address(content3); - - assert_eq!(addr1, addr2); - assert_ne!(addr1, addr3); - - // Should be a valid hex string - assert!(addr1.chars().all(|c| c.is_ascii_hexdigit())); - assert_eq!(addr1.len(), 128); // Blake2b512 produces 64 bytes = 128 hex chars -} - -#[tokio::test] -async fn test_delete_content() { - let store = create_test_store().await; - - let content = b"Content to delete"; - let content_address = store.store_content(content, None).await.unwrap(); - - // Verify content exists - assert!(store.content_exists(&content_address).await.unwrap()); - - // Delete content - let deleted = store.delete_content(&content_address).await.unwrap(); - assert!(deleted); - - // Verify content is gone - assert!(!store.content_exists(&content_address).await.unwrap()); - let retrieved = store.get_content(&content_address).await.unwrap(); - assert!(retrieved.is_none()); - - // Try to delete again - let deleted_again = store.delete_content(&content_address).await.unwrap(); - assert!(!deleted_again); -} - -#[tokio::test] -async fn test_cleanup_unreferenced_content() { - let store = create_test_store().await; - - // Store some content with references - let content1 = b"Referenced content"; - let ref1 = store - .store_reference_with_content("file1.txt".to_string(), content1, None) - .await - .unwrap(); - - // Store content without references - let content2 = b"Unreferenced content 1"; - let content3 = b"Unreferenced content 2"; - let _addr2 = store.store_content(content2, None).await.unwrap(); - let _addr3 = store.store_content(content3, None).await.unwrap(); - - // Initial stats - let stats = store.get_storage_stats().await.unwrap(); - assert_eq!(stats.content_object_count, 3); - assert_eq!(stats.reference_count, 1); - - // List unreferenced content - let unreferenced = store.list_unreferenced_content().await.unwrap(); - assert_eq!(unreferenced.len(), 2); - - // Cleanup unreferenced content - let cleaned_up = store.cleanup_unreferenced_content().await.unwrap(); - assert_eq!(cleaned_up, 2); - - // Check final stats - let final_stats = store.get_storage_stats().await.unwrap(); - assert_eq!(final_stats.content_object_count, 1); - assert_eq!(final_stats.reference_count, 1); - - // Referenced content should still exist - let (retrieved_ref, retrieved_content) = store - .get_reference_with_content(&ref1.id) - .await - .unwrap() - .unwrap(); - assert_eq!(retrieved_ref.id, ref1.id); - assert_eq!(retrieved_content.unwrap(), content1); -} - -#[tokio::test] -async fn test_storage_stats() { - let store = create_test_store().await; - - // Initial stats should be empty - let stats = store.get_storage_stats().await.unwrap(); - assert_eq!(stats.content_object_count, 0); - assert_eq!(stats.total_content_size, 0); - assert_eq!(stats.reference_count, 0); - - // Add some content and references - let content1 = b"First file"; - let content2 = b"Second file content"; - - let _ref1 = store - .store_reference_with_content("file1.txt".to_string(), content1, None) - .await - .unwrap(); - let _ref2 = store - .store_reference_with_content("file2.txt".to_string(), content2, None) - .await - .unwrap(); - - // Check updated stats - let final_stats = store.get_storage_stats().await.unwrap(); - assert_eq!(final_stats.content_object_count, 2); - assert_eq!(final_stats.reference_count, 2); -} - -#[tokio::test] -async fn test_reference_with_content_and_dependencies() { - let store = create_test_store().await; - - // Create dependencies with content - let dep1_content = b"Dependency 1 content"; - let dep2_content = b"Dependency 2 content"; - - let dep1 = store - .store_reference_with_content("dep1.txt".to_string(), dep1_content, None) - .await - .unwrap(); - let dep2 = store - .store_reference_with_content("dep2.txt".to_string(), dep2_content, None) - .await - .unwrap(); - - // Create parent with content and dependencies - let parent_content = b"Parent content"; - let parent = store - .store_reference_with_content("parent.txt".to_string(), parent_content, None) - .await - .unwrap(); - - // Add dependencies to parent - let parent_with_deps = parent.add_dep(Arc::new(dep1)).add_dep(Arc::new(dep2)); - store.store_reference(&parent_with_deps).await.unwrap(); - - // Retrieve parent with content - let (retrieved_parent, retrieved_content) = store - .get_reference_with_content(&parent_with_deps.id) - .await - .unwrap() - .unwrap(); - - assert_eq!(retrieved_parent.dependents.len(), 2); - assert_eq!(retrieved_content.unwrap(), parent_content); - - // Check that dependencies also have their content - for dep in &retrieved_parent.dependents { - let (_, dep_content) = store - .get_reference_with_content(&dep.id) - .await - .unwrap() - .unwrap(); - assert!(dep_content.is_some()); - } -}