515 lines
16 KiB
Rust
Raw Normal View History

2025-07-02 15:31:24 -05:00
use std::{collections::HashMap, sync::Arc};
use anyhow::Result;
use blake2::{Blake2b512, Digest};
use offline_web_model::Reference;
use sqlx::{Pool, Row, Sqlite, SqlitePool};
pub struct ReferenceStore {
pool: Pool<Sqlite>,
}
impl ReferenceStore {
pub async fn new(database_url: &str) -> Result<Self> {
let pool = SqlitePool::connect(database_url).await?;
let store = Self { pool };
store.initialize_schema().await?;
Ok(store)
}
async fn initialize_schema(&self) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS refs (
id TEXT PRIMARY KEY,
content_address TEXT,
name TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS reference_dependencies (
parent_id TEXT NOT NULL,
dependent_id TEXT NOT NULL,
PRIMARY KEY (parent_id, dependent_id)
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_name ON refs(name)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_content_address ON refs(content_address)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_parent ON reference_dependencies(parent_id)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_dependent ON reference_dependencies(dependent_id)")
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS content_objects (
content_address TEXT PRIMARY KEY,
content_data BLOB NOT NULL,
content_type TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_content_created ON content_objects(created_at)",
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn store_reference(&self, reference: &Reference) -> Result<()> {
let mut tx = self.pool.begin().await?;
// Insert or update the reference
sqlx::query(
r#"
INSERT OR REPLACE INTO refs (id, content_address, name, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
"#,
)
.bind(&reference.id)
.bind(&reference.content_address)
.bind(&reference.name)
.execute(&mut *tx)
.await?;
// Clear existing dependencies for this reference
sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?")
.bind(&reference.id)
.execute(&mut *tx)
.await?;
// Insert new dependencies
for dependent in &reference.dependents {
sqlx::query(
"INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)",
)
.bind(&reference.id)
.bind(&dependent.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn get_reference(&self, id: &str) -> Result<Option<Reference>> {
// Get the reference record
let row = sqlx::query("SELECT id, content_address, name FROM refs WHERE id = ?")
.bind(id)
.fetch_optional(&self.pool)
.await?;
let Some(row) = row else {
return Ok(None);
};
let reference_id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Get dependencies
let dependents = self.get_dependents(&reference_id).await?;
Ok(Some(Reference {
id: reference_id,
content_address,
name,
dependents,
}))
}
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>> {
// Get all dependents at once using a recursive CTE (Common Table Expression)
let rows = sqlx::query(
r#"
WITH RECURSIVE dependent_tree AS (
-- Base case: direct dependents
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as level
2025-07-02 15:31:24 -05:00
FROM refs r
JOIN reference_dependencies rd ON r.id = rd.dependent_id
WHERE rd.parent_id = ?
UNION ALL
-- Recursive case: dependents of dependents
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.level + 1
FROM refs r
JOIN reference_dependencies rd ON r.id = rd.dependent_id
JOIN dependent_tree dt ON rd.parent_id = dt.id
2025-07-02 15:31:24 -05:00
)
SELECT id, content_address, name, parent_id, level
FROM dependent_tree
ORDER BY level, name
"#,
)
.bind(parent_id)
.fetch_all(&self.pool)
.await?;
2025-07-02 15:31:24 -05:00
// Build the tree structure from the flattened results
let mut refs_map: std::collections::HashMap<String, Arc<Reference>> = std::collections::HashMap::new();
let mut children_map: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
// First pass: create all references and track relationships
for row in &rows {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
let parent_id: String = row.get("parent_id");
// Create reference without dependents first
let reference = Arc::new(Reference {
id: id.clone(),
content_address,
name,
dependents: Vec::new(),
});
refs_map.insert(id.clone(), reference);
children_map.entry(parent_id).or_default().push(id);
}
// Second pass: build the tree by adding dependents to each reference
fn build_dependents(
ref_id: &str,
refs_map: &std::collections::HashMap<String, Arc<Reference>>,
children_map: &std::collections::HashMap<String, Vec<String>>,
) -> Vec<Arc<Reference>> {
if let Some(child_ids) = children_map.get(ref_id) {
let mut dependents = Vec::new();
for child_id in child_ids {
if let Some(child_ref) = refs_map.get(child_id) {
let nested_dependents = build_dependents(child_id, refs_map, children_map);
let updated_child = Arc::new(Reference {
id: child_ref.id.clone(),
content_address: child_ref.content_address.clone(),
name: child_ref.name.clone(),
dependents: nested_dependents,
});
dependents.push(updated_child);
}
}
dependents.sort_by(|a, b| a.name.cmp(&b.name));
dependents
} else {
Vec::new()
2025-07-02 15:31:24 -05:00
}
}
2025-07-02 15:31:24 -05:00
Ok(build_dependents(parent_id, &refs_map, &children_map))
2025-07-02 15:31:24 -05:00
}
pub async fn get_references_by_name(&self, name: &str) -> Result<Vec<Reference>> {
let rows = sqlx::query("SELECT id FROM refs WHERE name = ?")
.bind(name)
.fetch_all(&self.pool)
.await?;
let mut references = Vec::new();
for row in rows {
let id: String = row.get("id");
if let Some(reference) = self.get_reference(&id).await? {
references.push(reference);
}
}
Ok(references)
}
pub async fn get_references_by_content_address(
&self,
content_address: &str,
) -> Result<Vec<Reference>> {
let rows = sqlx::query("SELECT id FROM refs WHERE content_address = ?")
.bind(content_address)
.fetch_all(&self.pool)
.await?;
let mut references = Vec::new();
for row in rows {
let id: String = row.get("id");
if let Some(reference) = self.get_reference(&id).await? {
references.push(reference);
}
}
Ok(references)
}
pub async fn delete_reference(&self, id: &str) -> Result<bool> {
let result = sqlx::query("DELETE FROM refs WHERE id = ?")
.bind(id)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn list_all_references(&self) -> Result<Vec<Reference>> {
let rows = sqlx::query("SELECT id FROM refs ORDER BY name")
.fetch_all(&self.pool)
.await?;
let mut references = Vec::new();
for row in rows {
let id: String = row.get("id");
if let Some(reference) = self.get_reference(&id).await? {
references.push(reference);
}
}
Ok(references)
}
pub async fn update_reference_graph(
&self,
updated_references: &HashMap<String, Arc<Reference>>,
) -> Result<()> {
let mut tx = self.pool.begin().await?;
for reference in updated_references.values() {
2025-07-02 15:31:24 -05:00
// Update the reference
sqlx::query(
r#"
INSERT OR REPLACE INTO refs (id, content_address, name, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
"#,
)
.bind(&reference.id)
.bind(&reference.content_address)
.bind(&reference.name)
.execute(&mut *tx)
.await?;
// Clear existing dependencies
sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?")
.bind(&reference.id)
.execute(&mut *tx)
.await?;
// Insert new dependencies
for dependent in &reference.dependents {
sqlx::query(
"INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)",
)
.bind(&reference.id)
.bind(&dependent.id)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
Ok(())
}
pub fn calculate_content_address(content: &[u8]) -> String {
let mut hasher = Blake2b512::new();
hasher.update(content);
format!("{:x}", hasher.finalize())
}
pub async fn store_content(
&self,
content: &[u8],
content_type: Option<String>,
) -> Result<String> {
let content_address = Self::calculate_content_address(content);
// Check if content already exists (deduplication)
let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?")
.bind(&content_address)
.fetch_optional(&self.pool)
.await?
.is_some();
if !exists {
sqlx::query(
r#"
INSERT INTO content_objects (content_address, content_data, content_type)
VALUES (?, ?, ?)
"#,
)
.bind(&content_address)
.bind(content)
.bind(&content_type)
.execute(&self.pool)
.await?;
}
Ok(content_address)
}
pub async fn get_content(&self, content_address: &str) -> Result<Option<Vec<u8>>> {
let row = sqlx::query("SELECT content_data FROM content_objects WHERE content_address = ?")
.bind(content_address)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|row| row.get::<Vec<u8>, _>("content_data")))
}
pub async fn get_content_info(&self, content_address: &str) -> Result<Option<ContentInfo>> {
let row = sqlx::query(
"SELECT content_type, created_at FROM content_objects WHERE content_address = ?",
)
.bind(content_address)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|row| ContentInfo {
content_address: content_address.to_string(),
content_type: row.get("content_type"),
size: 0, // Size not stored in database
created_at: row.get("created_at"),
}))
}
pub async fn content_exists(&self, content_address: &str) -> Result<bool> {
let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?")
.bind(content_address)
.fetch_optional(&self.pool)
.await?
.is_some();
Ok(exists)
}
pub async fn store_reference_with_content(
&self,
name: String,
content: &[u8],
content_type: Option<String>,
) -> Result<Reference> {
// Store the content and get its address
let content_address = self.store_content(content, content_type).await?;
// Create the reference
let reference = Reference::new(Some(content_address), name);
// Store the reference
self.store_reference(&reference).await?;
Ok(reference)
}
pub async fn get_reference_with_content(
&self,
id: &str,
) -> Result<Option<(Reference, Option<Vec<u8>>)>> {
let reference = self.get_reference(id).await?;
if let Some(ref reference) = reference {
if let Some(ref content_address) = reference.content_address {
let content = self.get_content(content_address).await?;
return Ok(Some((reference.clone(), content)));
}
}
Ok(reference.map(|r| (r, None)))
}
pub async fn delete_content(&self, content_address: &str) -> Result<bool> {
let result = sqlx::query("DELETE FROM content_objects WHERE content_address = ?")
.bind(content_address)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn list_unreferenced_content(&self) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"
SELECT co.content_address
FROM content_objects co
LEFT JOIN refs r ON co.content_address = r.content_address
WHERE r.content_address IS NULL
"#,
)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(|row| row.get("content_address"))
.collect())
}
pub async fn cleanup_unreferenced_content(&self) -> Result<usize> {
let result = sqlx::query(
r#"
DELETE FROM content_objects
WHERE content_address IN (
SELECT co.content_address
FROM content_objects co
LEFT JOIN refs r ON co.content_address = r.content_address
WHERE r.content_address IS NULL
)
"#,
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() as usize)
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let content_count = sqlx::query("SELECT COUNT(*) as count FROM content_objects")
.fetch_one(&self.pool)
.await?;
let reference_count = sqlx::query("SELECT COUNT(*) as count FROM refs")
.fetch_one(&self.pool)
.await?;
Ok(StorageStats {
content_object_count: content_count.get::<i64, _>("count") as usize,
total_content_size: 0, // Size not tracked
reference_count: reference_count.get::<i64, _>("count") as usize,
})
}
}
#[derive(Debug, Clone)]
pub struct ContentInfo {
pub content_address: String,
pub content_type: Option<String>,
pub size: usize,
pub created_at: String,
}
#[derive(Debug, Clone)]
pub struct StorageStats {
pub content_object_count: usize,
pub total_content_size: usize,
pub reference_count: usize,
}