use std::sync::Arc; use std::collections::{BTreeMap, HashMap}; use thiserror::Error; use offline_web_model::Reference; use sqlx::{Pool, Row, Sqlite, SqlitePool}; // Schema version constants const CURRENT_SCHEMA_VERSION: i32 = 1; const INITIAL_SCHEMA_VERSION: i32 = 0; pub struct SqliteReferenceStore { pool: Pool, } #[derive(Error, Debug)] pub enum StoreError { #[error("No such reference")] NoSuchReference, #[error("No such content address")] NoSuchContentAddress, #[error("Unknown Storage Error: {0:?}")] StorageError(Box), } #[allow(async_fn_in_trait)] pub trait ReferenceStore { async fn get_reference(&self, id: &str) -> Result; async fn get_content_for_reference(&self, reference: Reference) -> Result; async fn get_graph(&self, root_name: &str) -> Result, StoreError>; } impl SqliteReferenceStore { pub async fn new(database_url: &str) -> Result { let pool = SqlitePool::connect(database_url) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; let store = Self { pool }; // Check current schema version and migrate if necessary let current_version = store.get_current_schema_version().await?; if current_version != CURRENT_SCHEMA_VERSION { store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?; } Ok(store) } async fn get_current_schema_version(&self) -> Result { // First, ensure the schema_version table exists sqlx::query( r#" CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at DATETIME DEFAULT CURRENT_TIMESTAMP, description TEXT ) "#, ) .execute(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Get the current version let row = sqlx::query( r#" SELECT version FROM schema_version ORDER BY version DESC LIMIT 1 "#, ) .fetch_optional(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; match row { Some(row) => { let version: i32 = row.get("version"); Ok(version) } None => { // No version found, this is a fresh database Ok(INITIAL_SCHEMA_VERSION) } } } async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> { if from_version == to_version { return Ok(()); } if from_version > to_version { return Err(StoreError::StorageError( "Downward migrations not currently supported".into() )); } // Use a transaction for the entire migration process let mut tx = self.pool.begin().await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Apply migrations step by step let mut current_version = from_version; while current_version < to_version { match current_version { 0 => { // Migration from version 0 to 1: Initial schema setup self.migrate_to_v1(&mut tx).await?; current_version = 1; } _ => { return Err(StoreError::StorageError( format!("Unknown migration path from version {}", current_version).into() )); } } } // Commit all migrations tx.commit().await .map_err(|e| StoreError::StorageError(Box::new(e)))?; Ok(()) } async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> { // Create the main application tables sqlx::query( r#" CREATE TABLE IF NOT EXISTS ref_entries ( id TEXT PRIMARY KEY, content_address TEXT, name TEXT NOT NULL UNIQUE ) "#, ) .execute(&mut **tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; sqlx::query( r#" CREATE TABLE IF NOT EXISTS ref_dependencies ( parent_id TEXT NOT NULL, dependent_id TEXT NOT NULL, PRIMARY KEY (parent_id, dependent_id), FOREIGN KEY (parent_id) REFERENCES ref_entries(id), FOREIGN KEY (dependent_id) REFERENCES ref_entries(id) ) "#, ) .execute(&mut **tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; sqlx::query( r#" CREATE TABLE IF NOT EXISTS content_store ( content_address TEXT PRIMARY KEY, content BLOB NOT NULL ) "#, ) .execute(&mut **tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Record the schema version sqlx::query( r#" INSERT OR REPLACE INTO schema_version (version, description) VALUES (?, ?) "#, ) .bind(1) .bind("Initial schema with ref_entries, ref_dependencies, and content_store tables") .execute(&mut **tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; Ok(()) } pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> { // Use a transaction to ensure atomicity let mut tx = self.pool.begin().await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Insert or update the reference sqlx::query( r#" INSERT OR REPLACE INTO ref_entries (id, content_address, name) VALUES (?, ?, ?) "#, ) .bind(&reference.id) .bind(&reference.content_address) .bind(&reference.name) .execute(&mut *tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Delete existing dependencies for this reference sqlx::query( r#" DELETE FROM ref_dependencies WHERE parent_id = ? "#, ) .bind(&reference.id) .execute(&mut *tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Insert new dependencies for dependent in &reference.dependents { sqlx::query( r#" INSERT INTO ref_dependencies (parent_id, dependent_id) VALUES (?, ?) "#, ) .bind(&reference.id) .bind(&dependent.id) .execute(&mut *tx) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; } // Commit the transaction tx.commit().await .map_err(|e| StoreError::StorageError(Box::new(e)))?; Ok(()) } pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> { sqlx::query( r#" INSERT OR REPLACE INTO content_store (content_address, content) VALUES (?, ?) "#, ) .bind(content_address) .bind(content) .execute(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; Ok(()) } } impl ReferenceStore for SqliteReferenceStore { async fn get_reference(&self, id: &str) -> Result { // First, get the basic reference information let row = sqlx::query( r#" SELECT id, content_address, name FROM ref_entries WHERE id = ? "#, ) .bind(id) .fetch_optional(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; match row { Some(row) => { let id: String = row.get("id"); let content_address: Option = row.get("content_address"); let name: String = row.get("name"); // Get the dependents by recursively fetching them let dependents = self.get_dependents(&id).await?; Ok(Reference { id, content_address, name, dependents, }) } None => Err(StoreError::NoSuchReference), } } async fn get_content_for_reference(&self, reference: Reference) -> Result { if let Some(content_address) = &reference.content_address { let row = sqlx::query( r#" SELECT content FROM content_store WHERE content_address = ? "#, ) .bind(content_address) .fetch_optional(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; match row { Some(row) => { let content: Vec = row.get("content"); String::from_utf8(content) .map_err(|e| StoreError::StorageError(Box::new(e))) } None => Err(StoreError::NoSuchContentAddress), } } else { Err(StoreError::NoSuchContentAddress) } } async fn get_graph(&self, root_name: &str) -> Result, StoreError> { let mut visited = std::collections::HashSet::new(); let mut result = Vec::new(); let mut queue = std::collections::VecDeque::new(); // Start with the root name queue.push_back(root_name.to_string()); while let Some(current_name) = queue.pop_front() { if visited.contains(¤t_name) { continue; } visited.insert(current_name.clone()); // Get the reference by name let row = sqlx::query( r#" SELECT id, content_address, name FROM ref_entries WHERE name = ? "#, ) .bind(¤t_name) .fetch_optional(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; if let Some(row) = row { let id: String = row.get("id"); let content_address: Option = row.get("content_address"); let name: String = row.get("name"); // Get dependents for this reference let dependents = self.get_dependents(&id).await?; let reference = Reference { id, content_address, name, dependents: dependents.clone(), }; result.push(reference); // Add all dependent names to the queue for processing for dependent in dependents { if !visited.contains(&dependent.name) { queue.push_back(dependent.name.clone()); } } } } Ok(result) } } impl SqliteReferenceStore { async fn get_dependents(&self, parent_id: &str) -> Result>, StoreError> { // Use a CTE (Common Table Expression) to get the entire dependency tree in one query let rows = sqlx::query( r#" WITH RECURSIVE dependency_tree AS ( -- Base case: direct dependents of the parent SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth FROM ref_entries r JOIN ref_dependencies rd ON r.id = rd.dependent_id WHERE rd.parent_id = ? UNION ALL -- Recursive case: dependents of dependents SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1 FROM ref_entries r JOIN ref_dependencies rd ON r.id = rd.dependent_id JOIN dependency_tree dt ON rd.parent_id = dt.id WHERE dt.depth < 100 -- Prevent infinite recursion ) SELECT id, content_address, name, parent_id, depth FROM dependency_tree ORDER BY depth, name "#, ) .bind(parent_id) .fetch_all(&self.pool) .await .map_err(|e| StoreError::StorageError(Box::new(e)))?; // Build the dependency tree iteratively let mut reference_map: HashMap = HashMap::new(); let mut children_map: HashMap> = HashMap::new(); // First pass: create all references and build the children map for row in &rows { let id: String = row.get("id"); let content_address: Option = row.get("content_address"); let name: String = row.get("name"); let parent_id: String = row.get("parent_id"); let reference = Reference { id: id.clone(), content_address, name, dependents: Vec::new(), // Will be filled in second pass }; reference_map.insert(id.clone(), reference); children_map.entry(parent_id).or_default().push(id); } // Second pass: build the dependency tree from bottom up (highest depth first) let mut depth_groups: BTreeMap> = BTreeMap::new(); for row in &rows { let id: String = row.get("id"); let depth: i32 = row.get("depth"); depth_groups.entry(depth).or_default().push(id); } // Process from highest depth to lowest (leaves to roots) for (_depth, ids) in depth_groups.iter().rev() { for id in ids { if let Some(children) = children_map.get(id).cloned() { let child_references: Vec> = children .iter() .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) .collect(); if let Some(reference) = reference_map.get_mut(id) { reference.dependents = child_references; } } } } // Return the direct children of the parent let empty_vec = Vec::new(); let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec); let result = direct_children .iter() .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) .collect(); Ok(result) } } #[cfg(test)] mod integration_tests;