Compare commits
2 Commits
3fbeba535c
...
a6e501f3e5
Author | SHA1 | Date | |
---|---|---|---|
a6e501f3e5 | |||
dcfa8bd313 |
37
Cargo.lock
generated
37
Cargo.lock
generated
@ -468,7 +468,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1154,6 +1154,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
@ -1420,7 +1421,7 @@ dependencies = [
|
||||
"errno",
|
||||
"libc",
|
||||
"linux-raw-sys",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1684,7 +1685,7 @@ dependencies = [
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"sqlformat",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
@ -1770,7 +1771,7 @@ dependencies = [
|
||||
"smallvec",
|
||||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"whoami",
|
||||
@ -1810,7 +1811,7 @@ dependencies = [
|
||||
"smallvec",
|
||||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"whoami",
|
||||
@ -1913,7 +1914,7 @@ dependencies = [
|
||||
"getrandom 0.3.2",
|
||||
"once_cell",
|
||||
"rustix",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1922,7 +1923,16 @@ version = "1.0.69"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
"thiserror-impl 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "2.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
|
||||
dependencies = [
|
||||
"thiserror-impl 2.0.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1936,6 +1946,17 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "2.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinystr"
|
||||
version = "0.8.1"
|
||||
@ -2087,7 +2108,7 @@ dependencies = [
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
"sha1",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"utf-8",
|
||||
]
|
||||
|
||||
|
@ -13,5 +13,6 @@ anyhow = "1.0"
|
||||
uuid = { version = "1.0", features = ["v4"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
blake2 = "0.10"
|
||||
thiserror = "2.0.12"
|
||||
|
||||
[dev-dependencies]
|
193
offline-web-storage/src/integration_tests.rs
Normal file
193
offline-web-storage/src/integration_tests.rs
Normal file
@ -0,0 +1,193 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use offline_web_model::Reference;
|
||||
use super::{ReferenceStore, SqliteReferenceStore, StoreError};
|
||||
|
||||
async fn create_test_store() -> SqliteReferenceStore {
|
||||
SqliteReferenceStore::new("sqlite::memory:").await.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_store_and_retrieve_reference() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Create a test reference
|
||||
let reference = Reference::new(
|
||||
Some("test_content_address".to_string()),
|
||||
"test_reference".to_string(),
|
||||
);
|
||||
|
||||
// Store the reference
|
||||
store.store_reference(&reference).await.unwrap();
|
||||
|
||||
// Retrieve the reference by ID
|
||||
let retrieved = store.get_reference(&reference.id).await.unwrap();
|
||||
|
||||
// Verify the retrieved reference matches the original
|
||||
assert_eq!(retrieved.id, reference.id);
|
||||
assert_eq!(retrieved.content_address, reference.content_address);
|
||||
assert_eq!(retrieved.name, reference.name);
|
||||
assert_eq!(retrieved.dependents.len(), reference.dependents.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_store_and_retrieve_content() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let content = b"Hello, World!";
|
||||
let content_address = "test_content_address";
|
||||
|
||||
// Store content
|
||||
store.store_content(content_address, content).await.unwrap();
|
||||
|
||||
// Create a reference pointing to this content
|
||||
let reference = Reference::new(
|
||||
Some(content_address.to_string()),
|
||||
"test_reference".to_string(),
|
||||
);
|
||||
|
||||
// Retrieve content using the reference
|
||||
let retrieved_content = store.get_content_for_reference(reference).await.unwrap();
|
||||
|
||||
// Verify the content matches
|
||||
assert_eq!(retrieved_content, String::from_utf8(content.to_vec()).unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reference_with_dependents() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Create a leaf reference (no dependents)
|
||||
let leaf_ref = Reference::new(
|
||||
Some("leaf_content_address".to_string()),
|
||||
"leaf_reference".to_string(),
|
||||
);
|
||||
|
||||
// Store the leaf reference
|
||||
store.store_reference(&leaf_ref).await.unwrap();
|
||||
|
||||
// Create a parent reference that depends on the leaf
|
||||
let parent_ref = Reference::new(
|
||||
Some("parent_content_address".to_string()),
|
||||
"parent_reference".to_string(),
|
||||
).add_dep(Arc::new(leaf_ref.clone()));
|
||||
|
||||
// Store the parent reference
|
||||
store.store_reference(&parent_ref).await.unwrap();
|
||||
|
||||
// Retrieve the parent reference
|
||||
let retrieved_parent = store.get_reference(&parent_ref.id).await.unwrap();
|
||||
|
||||
// Verify the parent has the correct dependent
|
||||
assert_eq!(retrieved_parent.dependents.len(), 1);
|
||||
assert_eq!(retrieved_parent.dependents[0].name, leaf_ref.name);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_graph() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Create a hierarchy of references
|
||||
let leaf1 = Reference::new(
|
||||
Some("leaf1_content".to_string()),
|
||||
"leaf1".to_string(),
|
||||
);
|
||||
|
||||
let leaf2 = Reference::new(
|
||||
Some("leaf2_content".to_string()),
|
||||
"leaf2".to_string(),
|
||||
);
|
||||
|
||||
let parent = Reference::new(
|
||||
Some("parent_content".to_string()),
|
||||
"parent".to_string(),
|
||||
)
|
||||
.add_dep(Arc::new(leaf1.clone()))
|
||||
.add_dep(Arc::new(leaf2.clone()));
|
||||
|
||||
let root = Reference::new(
|
||||
Some("root_content".to_string()),
|
||||
"root".to_string(),
|
||||
).add_dep(Arc::new(parent.clone()));
|
||||
|
||||
// Store all references
|
||||
store.store_reference(&leaf1).await.unwrap();
|
||||
store.store_reference(&leaf2).await.unwrap();
|
||||
store.store_reference(&parent).await.unwrap();
|
||||
store.store_reference(&root).await.unwrap();
|
||||
|
||||
// Get the graph starting from root
|
||||
let graph = store.get_graph("root").await.unwrap();
|
||||
|
||||
// Verify we got all references in the graph
|
||||
assert_eq!(graph.len(), 4);
|
||||
|
||||
// Verify we have all the expected references
|
||||
let names: Vec<_> = graph.iter().map(|r| &r.name).collect();
|
||||
assert!(names.contains(&&"root".to_string()));
|
||||
assert!(names.contains(&&"parent".to_string()));
|
||||
assert!(names.contains(&&"leaf1".to_string()));
|
||||
assert!(names.contains(&&"leaf2".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_nonexistent_reference() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Try to retrieve a reference that doesn't exist
|
||||
let result = store.get_reference("nonexistent_id").await;
|
||||
|
||||
// Should return NoSuchReference error
|
||||
assert!(matches!(result, Err(StoreError::NoSuchReference)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_nonexistent_content() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Create a reference with a content address that doesn't exist
|
||||
let reference = Reference::new(
|
||||
Some("nonexistent_content_address".to_string()),
|
||||
"test_reference".to_string(),
|
||||
);
|
||||
|
||||
// Try to retrieve content
|
||||
let result = store.get_content_for_reference(reference).await;
|
||||
|
||||
// Should return NoSuchContentAddress error
|
||||
assert!(matches!(result, Err(StoreError::NoSuchContentAddress)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reference_without_content_address() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Create a reference without a content address
|
||||
let reference = Reference::new(None, "test_reference".to_string());
|
||||
|
||||
// Try to retrieve content
|
||||
let result = store.get_content_for_reference(reference).await;
|
||||
|
||||
// Should return NoSuchContentAddress error
|
||||
assert!(matches!(result, Err(StoreError::NoSuchContentAddress)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schema_version_management() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Verify the schema version is correctly set
|
||||
let version = store.get_current_schema_version().await.unwrap();
|
||||
assert_eq!(version, 1, "Schema version should be 1");
|
||||
|
||||
// Verify we can still perform basic operations
|
||||
let reference = Reference::new(
|
||||
Some("test_content".to_string()),
|
||||
"test_schema_version".to_string(),
|
||||
);
|
||||
|
||||
store.store_reference(&reference).await.unwrap();
|
||||
let retrieved = store.get_reference(&reference.id).await.unwrap();
|
||||
assert_eq!(retrieved.name, reference.name);
|
||||
}
|
||||
|
@ -1,514 +1,467 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
use anyhow::Result;
|
||||
use blake2::{Blake2b512, Digest};
|
||||
use thiserror::Error;
|
||||
use offline_web_model::Reference;
|
||||
use sqlx::{Pool, Row, Sqlite, SqlitePool};
|
||||
|
||||
pub struct ReferenceStore {
|
||||
// Schema version constants
|
||||
const CURRENT_SCHEMA_VERSION: i32 = 1;
|
||||
const INITIAL_SCHEMA_VERSION: i32 = 0;
|
||||
|
||||
pub struct SqliteReferenceStore {
|
||||
pool: Pool<Sqlite>,
|
||||
}
|
||||
|
||||
impl ReferenceStore {
|
||||
pub async fn new(database_url: &str) -> Result<Self> {
|
||||
let pool = SqlitePool::connect(database_url).await?;
|
||||
#[derive(Error, Debug)]
|
||||
pub enum StoreError {
|
||||
#[error("No such reference")]
|
||||
NoSuchReference,
|
||||
#[error("No such content address")]
|
||||
NoSuchContentAddress,
|
||||
#[error("Unknown Storage Error: {0:?}")]
|
||||
StorageError(Box<dyn std::error::Error>),
|
||||
}
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
pub trait ReferenceStore {
|
||||
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError>;
|
||||
|
||||
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError>;
|
||||
|
||||
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError>;
|
||||
}
|
||||
|
||||
impl SqliteReferenceStore {
|
||||
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
|
||||
let pool = SqlitePool::connect(database_url)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
let store = Self { pool };
|
||||
store.initialize_schema().await?;
|
||||
|
||||
// Check current schema version and migrate if necessary
|
||||
let current_version = store.get_current_schema_version().await?;
|
||||
if current_version != CURRENT_SCHEMA_VERSION {
|
||||
store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?;
|
||||
}
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
async fn initialize_schema(&self) -> Result<()> {
|
||||
async fn get_current_schema_version(&self) -> Result<i32, StoreError> {
|
||||
// First, ensure the schema_version table exists
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS refs (
|
||||
id TEXT PRIMARY KEY,
|
||||
content_address TEXT,
|
||||
name TEXT NOT NULL,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
version INTEGER PRIMARY KEY,
|
||||
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
description TEXT
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
sqlx::query(
|
||||
// Get the current version
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS reference_dependencies (
|
||||
parent_id TEXT NOT NULL,
|
||||
dependent_id TEXT NOT NULL,
|
||||
PRIMARY KEY (parent_id, dependent_id)
|
||||
)
|
||||
SELECT version FROM schema_version ORDER BY version DESC LIMIT 1
|
||||
"#,
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_name ON refs(name)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
match row {
|
||||
Some(row) => {
|
||||
let version: i32 = row.get("version");
|
||||
Ok(version)
|
||||
}
|
||||
None => {
|
||||
// No version found, this is a fresh database
|
||||
Ok(INITIAL_SCHEMA_VERSION)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_content_address ON refs(content_address)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> {
|
||||
if from_version == to_version {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_parent ON reference_dependencies(parent_id)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
if from_version > to_version {
|
||||
return Err(StoreError::StorageError(
|
||||
"Downward migrations not currently supported".into()
|
||||
));
|
||||
}
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_dependent ON reference_dependencies(dependent_id)")
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
// Use a transaction for the entire migration process
|
||||
let mut tx = self.pool.begin().await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS content_objects (
|
||||
content_address TEXT PRIMARY KEY,
|
||||
content_data BLOB NOT NULL,
|
||||
content_type TEXT,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
// Apply migrations step by step
|
||||
let mut current_version = from_version;
|
||||
while current_version < to_version {
|
||||
match current_version {
|
||||
0 => {
|
||||
// Migration from version 0 to 1: Initial schema setup
|
||||
self.migrate_to_v1(&mut tx).await?;
|
||||
current_version = 1;
|
||||
}
|
||||
_ => {
|
||||
return Err(StoreError::StorageError(
|
||||
format!("Unknown migration path from version {}", current_version).into()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
"CREATE INDEX IF NOT EXISTS idx_content_created ON content_objects(created_at)",
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
// Commit all migrations
|
||||
tx.commit().await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn store_reference(&self, reference: &Reference) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> {
|
||||
// Create the main application tables
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS ref_entries (
|
||||
id TEXT PRIMARY KEY,
|
||||
content_address TEXT,
|
||||
name TEXT NOT NULL UNIQUE
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS ref_dependencies (
|
||||
parent_id TEXT NOT NULL,
|
||||
dependent_id TEXT NOT NULL,
|
||||
PRIMARY KEY (parent_id, dependent_id),
|
||||
FOREIGN KEY (parent_id) REFERENCES ref_entries(id),
|
||||
FOREIGN KEY (dependent_id) REFERENCES ref_entries(id)
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS content_store (
|
||||
content_address TEXT PRIMARY KEY,
|
||||
content BLOB NOT NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
// Record the schema version
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT OR REPLACE INTO schema_version (version, description)
|
||||
VALUES (?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(1)
|
||||
.bind("Initial schema with ref_entries, ref_dependencies, and content_store tables")
|
||||
.execute(&mut **tx)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> {
|
||||
// Use a transaction to ensure atomicity
|
||||
let mut tx = self.pool.begin().await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
// Insert or update the reference
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT OR REPLACE INTO refs (id, content_address, name, updated_at)
|
||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
||||
INSERT OR REPLACE INTO ref_entries (id, content_address, name)
|
||||
VALUES (?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&reference.id)
|
||||
.bind(&reference.content_address)
|
||||
.bind(&reference.name)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
// Clear existing dependencies for this reference
|
||||
sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?")
|
||||
.bind(&reference.id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
// Delete existing dependencies for this reference
|
||||
sqlx::query(
|
||||
r#"
|
||||
DELETE FROM ref_dependencies
|
||||
WHERE parent_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(&reference.id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
// Insert new dependencies
|
||||
for dependent in &reference.dependents {
|
||||
sqlx::query(
|
||||
"INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)",
|
||||
r#"
|
||||
INSERT INTO ref_dependencies (parent_id, dependent_id)
|
||||
VALUES (?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&reference.id)
|
||||
.bind(&dependent.id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
// Commit the transaction
|
||||
tx.commit().await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_reference(&self, id: &str) -> Result<Option<Reference>> {
|
||||
// Get the reference record
|
||||
let row = sqlx::query("SELECT id, content_address, name FROM refs WHERE id = ?")
|
||||
.bind(id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT OR REPLACE INTO content_store (content_address, content)
|
||||
VALUES (?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(content_address)
|
||||
.bind(content)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
let Some(row) = row else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let reference_id: String = row.get("id");
|
||||
let content_address: Option<String> = row.get("content_address");
|
||||
let name: String = row.get("name");
|
||||
impl ReferenceStore for SqliteReferenceStore {
|
||||
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError> {
|
||||
// First, get the basic reference information
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT id, content_address, name
|
||||
FROM ref_entries
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
// Get dependencies
|
||||
let dependents = self.get_dependents(&reference_id).await?;
|
||||
match row {
|
||||
Some(row) => {
|
||||
let id: String = row.get("id");
|
||||
let content_address: Option<String> = row.get("content_address");
|
||||
let name: String = row.get("name");
|
||||
|
||||
Ok(Some(Reference {
|
||||
id: reference_id,
|
||||
content_address,
|
||||
name,
|
||||
dependents,
|
||||
}))
|
||||
// Get the dependents by recursively fetching them
|
||||
let dependents = self.get_dependents(&id).await?;
|
||||
|
||||
Ok(Reference {
|
||||
id,
|
||||
content_address,
|
||||
name,
|
||||
dependents,
|
||||
})
|
||||
}
|
||||
None => Err(StoreError::NoSuchReference),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>> {
|
||||
// Get all dependents at once using a recursive CTE (Common Table Expression)
|
||||
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError> {
|
||||
if let Some(content_address) = &reference.content_address {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT content
|
||||
FROM content_store
|
||||
WHERE content_address = ?
|
||||
"#,
|
||||
)
|
||||
.bind(content_address)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
match row {
|
||||
Some(row) => {
|
||||
let content: Vec<u8> = row.get("content");
|
||||
String::from_utf8(content)
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))
|
||||
}
|
||||
None => Err(StoreError::NoSuchContentAddress),
|
||||
}
|
||||
} else {
|
||||
Err(StoreError::NoSuchContentAddress)
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError> {
|
||||
let mut visited = std::collections::HashSet::new();
|
||||
let mut result = Vec::new();
|
||||
let mut queue = std::collections::VecDeque::new();
|
||||
|
||||
// Start with the root name
|
||||
queue.push_back(root_name.to_string());
|
||||
|
||||
while let Some(current_name) = queue.pop_front() {
|
||||
if visited.contains(¤t_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
visited.insert(current_name.clone());
|
||||
|
||||
// Get the reference by name
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT id, content_address, name
|
||||
FROM ref_entries
|
||||
WHERE name = ?
|
||||
"#,
|
||||
)
|
||||
.bind(¤t_name)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
if let Some(row) = row {
|
||||
let id: String = row.get("id");
|
||||
let content_address: Option<String> = row.get("content_address");
|
||||
let name: String = row.get("name");
|
||||
|
||||
// Get dependents for this reference
|
||||
let dependents = self.get_dependents(&id).await?;
|
||||
|
||||
let reference = Reference {
|
||||
id,
|
||||
content_address,
|
||||
name,
|
||||
dependents: dependents.clone(),
|
||||
};
|
||||
|
||||
result.push(reference);
|
||||
|
||||
// Add all dependent names to the queue for processing
|
||||
for dependent in dependents {
|
||||
if !visited.contains(&dependent.name) {
|
||||
queue.push_back(dependent.name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
impl SqliteReferenceStore {
|
||||
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>, StoreError> {
|
||||
// Use a CTE (Common Table Expression) to get the entire dependency tree in one query
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
WITH RECURSIVE dependent_tree AS (
|
||||
-- Base case: direct dependents
|
||||
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as level
|
||||
FROM refs r
|
||||
JOIN reference_dependencies rd ON r.id = rd.dependent_id
|
||||
WITH RECURSIVE dependency_tree AS (
|
||||
-- Base case: direct dependents of the parent
|
||||
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth
|
||||
FROM ref_entries r
|
||||
JOIN ref_dependencies rd ON r.id = rd.dependent_id
|
||||
WHERE rd.parent_id = ?
|
||||
|
||||
UNION ALL
|
||||
|
||||
-- Recursive case: dependents of dependents
|
||||
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.level + 1
|
||||
FROM refs r
|
||||
JOIN reference_dependencies rd ON r.id = rd.dependent_id
|
||||
JOIN dependent_tree dt ON rd.parent_id = dt.id
|
||||
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1
|
||||
FROM ref_entries r
|
||||
JOIN ref_dependencies rd ON r.id = rd.dependent_id
|
||||
JOIN dependency_tree dt ON rd.parent_id = dt.id
|
||||
WHERE dt.depth < 100 -- Prevent infinite recursion
|
||||
)
|
||||
SELECT id, content_address, name, parent_id, level
|
||||
FROM dependent_tree
|
||||
ORDER BY level, name
|
||||
SELECT id, content_address, name, parent_id, depth
|
||||
FROM dependency_tree
|
||||
ORDER BY depth, name
|
||||
"#,
|
||||
)
|
||||
.bind(parent_id)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||
|
||||
// Build the tree structure from the flattened results
|
||||
let mut refs_map: std::collections::HashMap<String, Arc<Reference>> = std::collections::HashMap::new();
|
||||
let mut children_map: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
|
||||
// Build the dependency tree iteratively
|
||||
let mut reference_map: HashMap<String, Reference> = HashMap::new();
|
||||
let mut children_map: HashMap<String, Vec<String>> = HashMap::new();
|
||||
|
||||
// First pass: create all references and track relationships
|
||||
// First pass: create all references and build the children map
|
||||
for row in &rows {
|
||||
let id: String = row.get("id");
|
||||
let content_address: Option<String> = row.get("content_address");
|
||||
let name: String = row.get("name");
|
||||
let parent_id: String = row.get("parent_id");
|
||||
|
||||
// Create reference without dependents first
|
||||
let reference = Arc::new(Reference {
|
||||
let reference = Reference {
|
||||
id: id.clone(),
|
||||
content_address,
|
||||
name,
|
||||
dependents: Vec::new(),
|
||||
});
|
||||
dependents: Vec::new(), // Will be filled in second pass
|
||||
};
|
||||
|
||||
refs_map.insert(id.clone(), reference);
|
||||
reference_map.insert(id.clone(), reference);
|
||||
children_map.entry(parent_id).or_default().push(id);
|
||||
}
|
||||
|
||||
// Second pass: build the tree by adding dependents to each reference
|
||||
fn build_dependents(
|
||||
ref_id: &str,
|
||||
refs_map: &std::collections::HashMap<String, Arc<Reference>>,
|
||||
children_map: &std::collections::HashMap<String, Vec<String>>,
|
||||
) -> Vec<Arc<Reference>> {
|
||||
if let Some(child_ids) = children_map.get(ref_id) {
|
||||
let mut dependents = Vec::new();
|
||||
for child_id in child_ids {
|
||||
if let Some(child_ref) = refs_map.get(child_id) {
|
||||
let nested_dependents = build_dependents(child_id, refs_map, children_map);
|
||||
let updated_child = Arc::new(Reference {
|
||||
id: child_ref.id.clone(),
|
||||
content_address: child_ref.content_address.clone(),
|
||||
name: child_ref.name.clone(),
|
||||
dependents: nested_dependents,
|
||||
});
|
||||
dependents.push(updated_child);
|
||||
// Second pass: build the dependency tree from bottom up (highest depth first)
|
||||
let mut depth_groups: BTreeMap<i32, Vec<String>> = BTreeMap::new();
|
||||
for row in &rows {
|
||||
let id: String = row.get("id");
|
||||
let depth: i32 = row.get("depth");
|
||||
depth_groups.entry(depth).or_default().push(id);
|
||||
}
|
||||
|
||||
// Process from highest depth to lowest (leaves to roots)
|
||||
for (_depth, ids) in depth_groups.iter().rev() {
|
||||
for id in ids {
|
||||
if let Some(children) = children_map.get(id).cloned() {
|
||||
let child_references: Vec<Arc<Reference>> = children
|
||||
.iter()
|
||||
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
|
||||
.collect();
|
||||
|
||||
if let Some(reference) = reference_map.get_mut(id) {
|
||||
reference.dependents = child_references;
|
||||
}
|
||||
}
|
||||
dependents.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
dependents
|
||||
} else {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
|
||||
Ok(build_dependents(parent_id, &refs_map, &children_map))
|
||||
// Return the direct children of the parent
|
||||
let empty_vec = Vec::new();
|
||||
let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec);
|
||||
let result = direct_children
|
||||
.iter()
|
||||
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
|
||||
.collect();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn get_references_by_name(&self, name: &str) -> Result<Vec<Reference>> {
|
||||
let rows = sqlx::query("SELECT id FROM refs WHERE name = ?")
|
||||
.bind(name)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let mut references = Vec::new();
|
||||
for row in rows {
|
||||
let id: String = row.get("id");
|
||||
if let Some(reference) = self.get_reference(&id).await? {
|
||||
references.push(reference);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(references)
|
||||
}
|
||||
|
||||
pub async fn get_references_by_content_address(
|
||||
&self,
|
||||
content_address: &str,
|
||||
) -> Result<Vec<Reference>> {
|
||||
let rows = sqlx::query("SELECT id FROM refs WHERE content_address = ?")
|
||||
.bind(content_address)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let mut references = Vec::new();
|
||||
for row in rows {
|
||||
let id: String = row.get("id");
|
||||
if let Some(reference) = self.get_reference(&id).await? {
|
||||
references.push(reference);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(references)
|
||||
}
|
||||
|
||||
pub async fn delete_reference(&self, id: &str) -> Result<bool> {
|
||||
let result = sqlx::query("DELETE FROM refs WHERE id = ?")
|
||||
.bind(id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn list_all_references(&self) -> Result<Vec<Reference>> {
|
||||
let rows = sqlx::query("SELECT id FROM refs ORDER BY name")
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let mut references = Vec::new();
|
||||
for row in rows {
|
||||
let id: String = row.get("id");
|
||||
if let Some(reference) = self.get_reference(&id).await? {
|
||||
references.push(reference);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(references)
|
||||
}
|
||||
|
||||
pub async fn update_reference_graph(
|
||||
&self,
|
||||
updated_references: &HashMap<String, Arc<Reference>>,
|
||||
) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
for reference in updated_references.values() {
|
||||
// Update the reference
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT OR REPLACE INTO refs (id, content_address, name, updated_at)
|
||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
||||
"#,
|
||||
)
|
||||
.bind(&reference.id)
|
||||
.bind(&reference.content_address)
|
||||
.bind(&reference.name)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
// Clear existing dependencies
|
||||
sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?")
|
||||
.bind(&reference.id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
// Insert new dependencies
|
||||
for dependent in &reference.dependents {
|
||||
sqlx::query(
|
||||
"INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)",
|
||||
)
|
||||
.bind(&reference.id)
|
||||
.bind(&dependent.id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn calculate_content_address(content: &[u8]) -> String {
|
||||
let mut hasher = Blake2b512::new();
|
||||
hasher.update(content);
|
||||
format!("{:x}", hasher.finalize())
|
||||
}
|
||||
|
||||
pub async fn store_content(
|
||||
&self,
|
||||
content: &[u8],
|
||||
content_type: Option<String>,
|
||||
) -> Result<String> {
|
||||
let content_address = Self::calculate_content_address(content);
|
||||
|
||||
// Check if content already exists (deduplication)
|
||||
let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?")
|
||||
.bind(&content_address)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?
|
||||
.is_some();
|
||||
|
||||
if !exists {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO content_objects (content_address, content_data, content_type)
|
||||
VALUES (?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&content_address)
|
||||
.bind(content)
|
||||
.bind(&content_type)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(content_address)
|
||||
}
|
||||
|
||||
pub async fn get_content(&self, content_address: &str) -> Result<Option<Vec<u8>>> {
|
||||
let row = sqlx::query("SELECT content_data FROM content_objects WHERE content_address = ?")
|
||||
.bind(content_address)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(row.map(|row| row.get::<Vec<u8>, _>("content_data")))
|
||||
}
|
||||
|
||||
pub async fn get_content_info(&self, content_address: &str) -> Result<Option<ContentInfo>> {
|
||||
let row = sqlx::query(
|
||||
"SELECT content_type, created_at FROM content_objects WHERE content_address = ?",
|
||||
)
|
||||
.bind(content_address)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(row.map(|row| ContentInfo {
|
||||
content_address: content_address.to_string(),
|
||||
content_type: row.get("content_type"),
|
||||
size: 0, // Size not stored in database
|
||||
created_at: row.get("created_at"),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn content_exists(&self, content_address: &str) -> Result<bool> {
|
||||
let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?")
|
||||
.bind(content_address)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?
|
||||
.is_some();
|
||||
|
||||
Ok(exists)
|
||||
}
|
||||
|
||||
pub async fn store_reference_with_content(
|
||||
&self,
|
||||
name: String,
|
||||
content: &[u8],
|
||||
content_type: Option<String>,
|
||||
) -> Result<Reference> {
|
||||
// Store the content and get its address
|
||||
let content_address = self.store_content(content, content_type).await?;
|
||||
|
||||
// Create the reference
|
||||
let reference = Reference::new(Some(content_address), name);
|
||||
|
||||
// Store the reference
|
||||
self.store_reference(&reference).await?;
|
||||
|
||||
Ok(reference)
|
||||
}
|
||||
|
||||
pub async fn get_reference_with_content(
|
||||
&self,
|
||||
id: &str,
|
||||
) -> Result<Option<(Reference, Option<Vec<u8>>)>> {
|
||||
let reference = self.get_reference(id).await?;
|
||||
|
||||
if let Some(ref reference) = reference {
|
||||
if let Some(ref content_address) = reference.content_address {
|
||||
let content = self.get_content(content_address).await?;
|
||||
return Ok(Some((reference.clone(), content)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(reference.map(|r| (r, None)))
|
||||
}
|
||||
|
||||
pub async fn delete_content(&self, content_address: &str) -> Result<bool> {
|
||||
let result = sqlx::query("DELETE FROM content_objects WHERE content_address = ?")
|
||||
.bind(content_address)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn list_unreferenced_content(&self) -> Result<Vec<String>> {
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT co.content_address
|
||||
FROM content_objects co
|
||||
LEFT JOIN refs r ON co.content_address = r.content_address
|
||||
WHERE r.content_address IS NULL
|
||||
"#,
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|row| row.get("content_address"))
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn cleanup_unreferenced_content(&self) -> Result<usize> {
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
DELETE FROM content_objects
|
||||
WHERE content_address IN (
|
||||
SELECT co.content_address
|
||||
FROM content_objects co
|
||||
LEFT JOIN refs r ON co.content_address = r.content_address
|
||||
WHERE r.content_address IS NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.rows_affected() as usize)
|
||||
}
|
||||
|
||||
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
|
||||
let content_count = sqlx::query("SELECT COUNT(*) as count FROM content_objects")
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
let reference_count = sqlx::query("SELECT COUNT(*) as count FROM refs")
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(StorageStats {
|
||||
content_object_count: content_count.get::<i64, _>("count") as usize,
|
||||
total_content_size: 0, // Size not tracked
|
||||
reference_count: reference_count.get::<i64, _>("count") as usize,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ContentInfo {
|
||||
pub content_address: String,
|
||||
pub content_type: Option<String>,
|
||||
pub size: usize,
|
||||
pub created_at: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StorageStats {
|
||||
pub content_object_count: usize,
|
||||
pub total_content_size: usize,
|
||||
pub reference_count: usize,
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod integration_tests;
|
||||
|
@ -1,430 +0,0 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use offline_web_model::Reference;
|
||||
use offline_web_storage::ReferenceStore;
|
||||
|
||||
async fn create_test_store() -> ReferenceStore {
|
||||
ReferenceStore::new("sqlite::memory:").await.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_store_and_retrieve_reference() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let reference = Reference::new(Some("abc123".to_string()), "test.txt".to_string());
|
||||
|
||||
// Store the reference
|
||||
store.store_reference(&reference).await.unwrap();
|
||||
|
||||
// Retrieve it
|
||||
let retrieved = store.get_reference(&reference.id).await.unwrap();
|
||||
assert!(retrieved.is_some());
|
||||
|
||||
let retrieved = retrieved.unwrap();
|
||||
assert_eq!(retrieved.id, reference.id);
|
||||
assert_eq!(retrieved.content_address, reference.content_address);
|
||||
assert_eq!(retrieved.name, reference.name);
|
||||
assert_eq!(retrieved.dependents.len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_store_reference_with_dependents() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let dep1 = Arc::new(Reference::new(
|
||||
Some("dep1".to_string()),
|
||||
"dep1.txt".to_string(),
|
||||
));
|
||||
let dep2 = Arc::new(Reference::new(
|
||||
Some("dep2".to_string()),
|
||||
"dep2.txt".to_string(),
|
||||
));
|
||||
|
||||
// Store dependencies first
|
||||
store.store_reference(&dep1).await.unwrap();
|
||||
store.store_reference(&dep2).await.unwrap();
|
||||
|
||||
let mut parent = Reference::new(Some("parent".to_string()), "parent.txt".to_string());
|
||||
parent = parent.add_dep(dep1.clone());
|
||||
parent = parent.add_dep(dep2.clone());
|
||||
|
||||
// Store parent with dependencies
|
||||
store.store_reference(&parent).await.unwrap();
|
||||
|
||||
// Retrieve and verify
|
||||
let retrieved = store.get_reference(&parent.id).await.unwrap().unwrap();
|
||||
assert_eq!(retrieved.dependents.len(), 2);
|
||||
|
||||
let dep_names: Vec<_> = retrieved.dependents.iter().map(|d| &d.name).collect();
|
||||
assert!(dep_names.contains(&&"dep1.txt".to_string()));
|
||||
assert!(dep_names.contains(&&"dep2.txt".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_references_by_name() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let ref1 = Reference::new(Some("abc1".to_string()), "test.txt".to_string());
|
||||
let ref2 = Reference::new(Some("abc2".to_string()), "test.txt".to_string());
|
||||
let ref3 = Reference::new(Some("abc3".to_string()), "other.txt".to_string());
|
||||
|
||||
store.store_reference(&ref1).await.unwrap();
|
||||
store.store_reference(&ref2).await.unwrap();
|
||||
store.store_reference(&ref3).await.unwrap();
|
||||
|
||||
let results = store.get_references_by_name("test.txt").await.unwrap();
|
||||
assert_eq!(results.len(), 2);
|
||||
|
||||
let results = store.get_references_by_name("other.txt").await.unwrap();
|
||||
assert_eq!(results.len(), 1);
|
||||
|
||||
let results = store
|
||||
.get_references_by_name("nonexistent.txt")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(results.len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_references_by_content_address() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let ref1 = Reference::new(Some("same_content".to_string()), "file1.txt".to_string());
|
||||
let ref2 = Reference::new(Some("same_content".to_string()), "file2.txt".to_string());
|
||||
let ref3 = Reference::new(
|
||||
Some("different_content".to_string()),
|
||||
"file3.txt".to_string(),
|
||||
);
|
||||
|
||||
store.store_reference(&ref1).await.unwrap();
|
||||
store.store_reference(&ref2).await.unwrap();
|
||||
store.store_reference(&ref3).await.unwrap();
|
||||
|
||||
let results = store
|
||||
.get_references_by_content_address("same_content")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(results.len(), 2);
|
||||
|
||||
let results = store
|
||||
.get_references_by_content_address("different_content")
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(results.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_reference() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let reference = Reference::new(Some("abc123".to_string()), "test.txt".to_string());
|
||||
store.store_reference(&reference).await.unwrap();
|
||||
|
||||
// Verify it exists
|
||||
let retrieved = store.get_reference(&reference.id).await.unwrap();
|
||||
assert!(retrieved.is_some());
|
||||
|
||||
// Delete it
|
||||
let deleted = store.delete_reference(&reference.id).await.unwrap();
|
||||
assert!(deleted);
|
||||
|
||||
// Verify it's gone
|
||||
let retrieved = store.get_reference(&reference.id).await.unwrap();
|
||||
assert!(retrieved.is_none());
|
||||
|
||||
// Try to delete again
|
||||
let deleted = store.delete_reference(&reference.id).await.unwrap();
|
||||
assert!(!deleted);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_all_references() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let ref1 = Reference::new(Some("abc1".to_string()), "a.txt".to_string());
|
||||
let ref2 = Reference::new(Some("abc2".to_string()), "b.txt".to_string());
|
||||
let ref3 = Reference::new(Some("abc3".to_string()), "c.txt".to_string());
|
||||
|
||||
store.store_reference(&ref1).await.unwrap();
|
||||
store.store_reference(&ref2).await.unwrap();
|
||||
store.store_reference(&ref3).await.unwrap();
|
||||
|
||||
let all_refs = store.list_all_references().await.unwrap();
|
||||
assert_eq!(all_refs.len(), 3);
|
||||
|
||||
// Should be sorted by name
|
||||
assert_eq!(all_refs[0].name, "a.txt");
|
||||
assert_eq!(all_refs[1].name, "b.txt");
|
||||
assert_eq!(all_refs[2].name, "c.txt");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_reference_graph() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let ref1 = Arc::new(Reference::new(
|
||||
Some("abc1".to_string()),
|
||||
"file1.txt".to_string(),
|
||||
));
|
||||
let ref2 = Arc::new(Reference::new(
|
||||
Some("abc2".to_string()),
|
||||
"file2.txt".to_string(),
|
||||
));
|
||||
|
||||
let mut updated_refs = HashMap::new();
|
||||
updated_refs.insert(ref1.id.clone(), ref1.clone());
|
||||
updated_refs.insert(ref2.id.clone(), ref2.clone());
|
||||
|
||||
store.update_reference_graph(&updated_refs).await.unwrap();
|
||||
|
||||
// Verify both references were stored
|
||||
let retrieved1 = store.get_reference(&ref1.id).await.unwrap();
|
||||
let retrieved2 = store.get_reference(&ref2.id).await.unwrap();
|
||||
|
||||
assert!(retrieved1.is_some());
|
||||
assert!(retrieved2.is_some());
|
||||
assert_eq!(retrieved1.unwrap().name, "file1.txt");
|
||||
assert_eq!(retrieved2.unwrap().name, "file2.txt");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_store_and_retrieve_content() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let content = b"Hello, world!";
|
||||
let content_type = Some("text/plain".to_string());
|
||||
|
||||
// Store content
|
||||
let content_address = store
|
||||
.store_content(content, content_type.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Retrieve content
|
||||
let retrieved_content = store.get_content(&content_address).await.unwrap();
|
||||
assert!(retrieved_content.is_some());
|
||||
assert_eq!(retrieved_content.unwrap(), content);
|
||||
|
||||
// Check content info
|
||||
let content_info = store.get_content_info(&content_address).await.unwrap();
|
||||
assert!(content_info.is_some());
|
||||
let info = content_info.unwrap();
|
||||
assert_eq!(info.content_address, content_address);
|
||||
assert_eq!(info.content_type, content_type);
|
||||
|
||||
// Check content exists
|
||||
assert!(store.content_exists(&content_address).await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_content_deduplication() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let content = b"Duplicate content";
|
||||
|
||||
// Store same content twice
|
||||
let addr1 = store.store_content(content, None).await.unwrap();
|
||||
let addr2 = store.store_content(content, None).await.unwrap();
|
||||
|
||||
// Should get same address
|
||||
assert_eq!(addr1, addr2);
|
||||
|
||||
// Should only have one copy in storage
|
||||
let stats = store.get_storage_stats().await.unwrap();
|
||||
assert_eq!(stats.content_object_count, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_store_reference_with_content() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let content = b"File content here";
|
||||
let name = "test_file.txt".to_string();
|
||||
let content_type = Some("text/plain".to_string());
|
||||
|
||||
// Store reference with content
|
||||
let reference = store
|
||||
.store_reference_with_content(name.clone(), content, content_type)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(reference.name, name);
|
||||
assert!(reference.content_address.is_some());
|
||||
|
||||
// Retrieve reference with content
|
||||
let (retrieved_ref, retrieved_content) = store
|
||||
.get_reference_with_content(&reference.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(retrieved_ref.id, reference.id);
|
||||
assert_eq!(retrieved_ref.name, name);
|
||||
assert!(retrieved_content.is_some());
|
||||
assert_eq!(retrieved_content.unwrap(), content);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_calculate_content_address() {
|
||||
let content1 = b"Same content";
|
||||
let content2 = b"Same content";
|
||||
let content3 = b"Different content";
|
||||
|
||||
let addr1 = ReferenceStore::calculate_content_address(content1);
|
||||
let addr2 = ReferenceStore::calculate_content_address(content2);
|
||||
let addr3 = ReferenceStore::calculate_content_address(content3);
|
||||
|
||||
assert_eq!(addr1, addr2);
|
||||
assert_ne!(addr1, addr3);
|
||||
|
||||
// Should be a valid hex string
|
||||
assert!(addr1.chars().all(|c| c.is_ascii_hexdigit()));
|
||||
assert_eq!(addr1.len(), 128); // Blake2b512 produces 64 bytes = 128 hex chars
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_content() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
let content = b"Content to delete";
|
||||
let content_address = store.store_content(content, None).await.unwrap();
|
||||
|
||||
// Verify content exists
|
||||
assert!(store.content_exists(&content_address).await.unwrap());
|
||||
|
||||
// Delete content
|
||||
let deleted = store.delete_content(&content_address).await.unwrap();
|
||||
assert!(deleted);
|
||||
|
||||
// Verify content is gone
|
||||
assert!(!store.content_exists(&content_address).await.unwrap());
|
||||
let retrieved = store.get_content(&content_address).await.unwrap();
|
||||
assert!(retrieved.is_none());
|
||||
|
||||
// Try to delete again
|
||||
let deleted_again = store.delete_content(&content_address).await.unwrap();
|
||||
assert!(!deleted_again);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cleanup_unreferenced_content() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Store some content with references
|
||||
let content1 = b"Referenced content";
|
||||
let ref1 = store
|
||||
.store_reference_with_content("file1.txt".to_string(), content1, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Store content without references
|
||||
let content2 = b"Unreferenced content 1";
|
||||
let content3 = b"Unreferenced content 2";
|
||||
let _addr2 = store.store_content(content2, None).await.unwrap();
|
||||
let _addr3 = store.store_content(content3, None).await.unwrap();
|
||||
|
||||
// Initial stats
|
||||
let stats = store.get_storage_stats().await.unwrap();
|
||||
assert_eq!(stats.content_object_count, 3);
|
||||
assert_eq!(stats.reference_count, 1);
|
||||
|
||||
// List unreferenced content
|
||||
let unreferenced = store.list_unreferenced_content().await.unwrap();
|
||||
assert_eq!(unreferenced.len(), 2);
|
||||
|
||||
// Cleanup unreferenced content
|
||||
let cleaned_up = store.cleanup_unreferenced_content().await.unwrap();
|
||||
assert_eq!(cleaned_up, 2);
|
||||
|
||||
// Check final stats
|
||||
let final_stats = store.get_storage_stats().await.unwrap();
|
||||
assert_eq!(final_stats.content_object_count, 1);
|
||||
assert_eq!(final_stats.reference_count, 1);
|
||||
|
||||
// Referenced content should still exist
|
||||
let (retrieved_ref, retrieved_content) = store
|
||||
.get_reference_with_content(&ref1.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(retrieved_ref.id, ref1.id);
|
||||
assert_eq!(retrieved_content.unwrap(), content1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_storage_stats() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Initial stats should be empty
|
||||
let stats = store.get_storage_stats().await.unwrap();
|
||||
assert_eq!(stats.content_object_count, 0);
|
||||
assert_eq!(stats.total_content_size, 0);
|
||||
assert_eq!(stats.reference_count, 0);
|
||||
|
||||
// Add some content and references
|
||||
let content1 = b"First file";
|
||||
let content2 = b"Second file content";
|
||||
|
||||
let _ref1 = store
|
||||
.store_reference_with_content("file1.txt".to_string(), content1, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let _ref2 = store
|
||||
.store_reference_with_content("file2.txt".to_string(), content2, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Check updated stats
|
||||
let final_stats = store.get_storage_stats().await.unwrap();
|
||||
assert_eq!(final_stats.content_object_count, 2);
|
||||
assert_eq!(final_stats.reference_count, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reference_with_content_and_dependencies() {
|
||||
let store = create_test_store().await;
|
||||
|
||||
// Create dependencies with content
|
||||
let dep1_content = b"Dependency 1 content";
|
||||
let dep2_content = b"Dependency 2 content";
|
||||
|
||||
let dep1 = store
|
||||
.store_reference_with_content("dep1.txt".to_string(), dep1_content, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let dep2 = store
|
||||
.store_reference_with_content("dep2.txt".to_string(), dep2_content, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create parent with content and dependencies
|
||||
let parent_content = b"Parent content";
|
||||
let parent = store
|
||||
.store_reference_with_content("parent.txt".to_string(), parent_content, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add dependencies to parent
|
||||
let parent_with_deps = parent.add_dep(Arc::new(dep1)).add_dep(Arc::new(dep2));
|
||||
store.store_reference(&parent_with_deps).await.unwrap();
|
||||
|
||||
// Retrieve parent with content
|
||||
let (retrieved_parent, retrieved_content) = store
|
||||
.get_reference_with_content(&parent_with_deps.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(retrieved_parent.dependents.len(), 2);
|
||||
assert_eq!(retrieved_content.unwrap(), parent_content);
|
||||
|
||||
// Check that dependencies also have their content
|
||||
for dep in &retrieved_parent.dependents {
|
||||
let (_, dep_content) = store
|
||||
.get_reference_with_content(&dep.id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(dep_content.is_some());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user