From def2eec18aca8e40365d66b761fbd11bfba52761 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Fri, 4 Jul 2025 11:35:28 -0500 Subject: [PATCH] wip: move the sqlite implementation into its own module --- offline-web-storage/src/lib.rs | 443 +---------------------------- offline-web-storage/src/sqlite.rs | 447 ++++++++++++++++++++++++++++++ 2 files changed, 449 insertions(+), 441 deletions(-) create mode 100644 offline-web-storage/src/sqlite.rs diff --git a/offline-web-storage/src/lib.rs b/offline-web-storage/src/lib.rs index 68d9bd6..28f9329 100644 --- a/offline-web-storage/src/lib.rs +++ b/offline-web-storage/src/lib.rs @@ -1,17 +1,5 @@ -use std::sync::Arc; -use std::collections::{BTreeMap, HashMap}; - use thiserror::Error; use offline_web_model::Reference; -use sqlx::{Pool, Row, Sqlite, SqlitePool}; - -// Schema version constants -const CURRENT_SCHEMA_VERSION: i32 = 1; -const INITIAL_SCHEMA_VERSION: i32 = 0; - -pub struct SqliteReferenceStore { - pool: Pool, -} #[derive(Error, Debug)] pub enum StoreError { @@ -32,436 +20,9 @@ pub trait ReferenceStore { async fn get_graph(&self, root_name: &str) -> Result, StoreError>; } -impl SqliteReferenceStore { - pub async fn new(database_url: &str) -> Result { - let pool = SqlitePool::connect(database_url) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - let store = Self { pool }; - - // Check current schema version and migrate if necessary - let current_version = store.get_current_schema_version().await?; - if current_version != CURRENT_SCHEMA_VERSION { - store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?; - } +mod sqlite; - Ok(store) - } - - async fn get_current_schema_version(&self) -> Result { - // First, ensure the schema_version table exists - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS schema_version ( - version INTEGER PRIMARY KEY, - applied_at DATETIME DEFAULT CURRENT_TIMESTAMP, - description TEXT - ) - "#, - ) - .execute(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Get the current version - let row = sqlx::query( - r#" - SELECT version FROM schema_version ORDER BY version DESC LIMIT 1 - "#, - ) - .fetch_optional(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - match row { - Some(row) => { - let version: i32 = row.get("version"); - Ok(version) - } - None => { - // No version found, this is a fresh database - Ok(INITIAL_SCHEMA_VERSION) - } - } - } - - async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> { - if from_version == to_version { - return Ok(()); - } - - if from_version > to_version { - return Err(StoreError::StorageError( - "Downward migrations not currently supported".into() - )); - } - - // Use a transaction for the entire migration process - let mut tx = self.pool.begin().await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Apply migrations step by step - let mut current_version = from_version; - while current_version < to_version { - match current_version { - 0 => { - // Migration from version 0 to 1: Initial schema setup - self.migrate_to_v1(&mut tx).await?; - current_version = 1; - } - _ => { - return Err(StoreError::StorageError( - format!("Unknown migration path from version {}", current_version).into() - )); - } - } - } - - // Commit all migrations - tx.commit().await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - Ok(()) - } - - async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> { - // Create the main application tables - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS ref_entries ( - id TEXT PRIMARY KEY, - content_address TEXT, - name TEXT NOT NULL UNIQUE - ) - "#, - ) - .execute(&mut **tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS ref_dependencies ( - parent_id TEXT NOT NULL, - dependent_id TEXT NOT NULL, - PRIMARY KEY (parent_id, dependent_id), - FOREIGN KEY (parent_id) REFERENCES ref_entries(id), - FOREIGN KEY (dependent_id) REFERENCES ref_entries(id) - ) - "#, - ) - .execute(&mut **tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS content_store ( - content_address TEXT PRIMARY KEY, - content BLOB NOT NULL - ) - "#, - ) - .execute(&mut **tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Record the schema version - sqlx::query( - r#" - INSERT OR REPLACE INTO schema_version (version, description) - VALUES (?, ?) - "#, - ) - .bind(1) - .bind("Initial schema with ref_entries, ref_dependencies, and content_store tables") - .execute(&mut **tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - Ok(()) - } - - pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> { - // Use a transaction to ensure atomicity - let mut tx = self.pool.begin().await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Insert or update the reference - sqlx::query( - r#" - INSERT OR REPLACE INTO ref_entries (id, content_address, name) - VALUES (?, ?, ?) - "#, - ) - .bind(&reference.id) - .bind(&reference.content_address) - .bind(&reference.name) - .execute(&mut *tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Delete existing dependencies for this reference - sqlx::query( - r#" - DELETE FROM ref_dependencies - WHERE parent_id = ? - "#, - ) - .bind(&reference.id) - .execute(&mut *tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Insert new dependencies - for dependent in &reference.dependents { - sqlx::query( - r#" - INSERT INTO ref_dependencies (parent_id, dependent_id) - VALUES (?, ?) - "#, - ) - .bind(&reference.id) - .bind(&dependent.id) - .execute(&mut *tx) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - } - - // Commit the transaction - tx.commit().await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - Ok(()) - } - - pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> { - sqlx::query( - r#" - INSERT OR REPLACE INTO content_store (content_address, content) - VALUES (?, ?) - "#, - ) - .bind(content_address) - .bind(content) - .execute(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - Ok(()) - } -} - -impl ReferenceStore for SqliteReferenceStore { - async fn get_reference(&self, id: &str) -> Result { - // First, get the basic reference information - let row = sqlx::query( - r#" - SELECT id, content_address, name - FROM ref_entries - WHERE id = ? - "#, - ) - .bind(id) - .fetch_optional(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - match row { - Some(row) => { - let id: String = row.get("id"); - let content_address: Option = row.get("content_address"); - let name: String = row.get("name"); - - // Get the dependents by recursively fetching them - let dependents = self.get_dependents(&id).await?; - - Ok(Reference { - id, - content_address, - name, - dependents, - }) - } - None => Err(StoreError::NoSuchReference), - } - } - - async fn get_content_for_reference(&self, reference: Reference) -> Result { - if let Some(content_address) = &reference.content_address { - let row = sqlx::query( - r#" - SELECT content - FROM content_store - WHERE content_address = ? - "#, - ) - .bind(content_address) - .fetch_optional(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - match row { - Some(row) => { - let content: Vec = row.get("content"); - String::from_utf8(content) - .map_err(|e| StoreError::StorageError(Box::new(e))) - } - None => Err(StoreError::NoSuchContentAddress), - } - } else { - Err(StoreError::NoSuchContentAddress) - } - } - - async fn get_graph(&self, root_name: &str) -> Result, StoreError> { - let mut visited = std::collections::HashSet::new(); - let mut result = Vec::new(); - let mut queue = std::collections::VecDeque::new(); - - // Start with the root name - queue.push_back(root_name.to_string()); - - while let Some(current_name) = queue.pop_front() { - if visited.contains(¤t_name) { - continue; - } - - visited.insert(current_name.clone()); - - // Get the reference by name - let row = sqlx::query( - r#" - SELECT id, content_address, name - FROM ref_entries - WHERE name = ? - "#, - ) - .bind(¤t_name) - .fetch_optional(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - if let Some(row) = row { - let id: String = row.get("id"); - let content_address: Option = row.get("content_address"); - let name: String = row.get("name"); - - // Get dependents for this reference - let dependents = self.get_dependents(&id).await?; - - let reference = Reference { - id, - content_address, - name, - dependents: dependents.clone(), - }; - - result.push(reference); - - // Add all dependent names to the queue for processing - for dependent in dependents { - if !visited.contains(&dependent.name) { - queue.push_back(dependent.name.clone()); - } - } - } - } - - Ok(result) - } -} - -impl SqliteReferenceStore { - async fn get_dependents(&self, parent_id: &str) -> Result>, StoreError> { - // Use a CTE (Common Table Expression) to get the entire dependency tree in one query - let rows = sqlx::query( - r#" - WITH RECURSIVE dependency_tree AS ( - -- Base case: direct dependents of the parent - SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth - FROM ref_entries r - JOIN ref_dependencies rd ON r.id = rd.dependent_id - WHERE rd.parent_id = ? - - UNION ALL - - -- Recursive case: dependents of dependents - SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1 - FROM ref_entries r - JOIN ref_dependencies rd ON r.id = rd.dependent_id - JOIN dependency_tree dt ON rd.parent_id = dt.id - WHERE dt.depth < 100 -- Prevent infinite recursion - ) - SELECT id, content_address, name, parent_id, depth - FROM dependency_tree - ORDER BY depth, name - "#, - ) - .bind(parent_id) - .fetch_all(&self.pool) - .await - .map_err(|e| StoreError::StorageError(Box::new(e)))?; - - // Build the dependency tree iteratively - let mut reference_map: HashMap = HashMap::new(); - let mut children_map: HashMap> = HashMap::new(); - - // First pass: create all references and build the children map - for row in &rows { - let id: String = row.get("id"); - let content_address: Option = row.get("content_address"); - let name: String = row.get("name"); - let parent_id: String = row.get("parent_id"); - - let reference = Reference { - id: id.clone(), - content_address, - name, - dependents: Vec::new(), // Will be filled in second pass - }; - - reference_map.insert(id.clone(), reference); - children_map.entry(parent_id).or_default().push(id); - } - - // Second pass: build the dependency tree from bottom up (highest depth first) - let mut depth_groups: BTreeMap> = BTreeMap::new(); - for row in &rows { - let id: String = row.get("id"); - let depth: i32 = row.get("depth"); - depth_groups.entry(depth).or_default().push(id); - } - - // Process from highest depth to lowest (leaves to roots) - for (_depth, ids) in depth_groups.iter().rev() { - for id in ids { - if let Some(children) = children_map.get(id).cloned() { - let child_references: Vec> = children - .iter() - .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) - .collect(); - - if let Some(reference) = reference_map.get_mut(id) { - reference.dependents = child_references; - } - } - } - } - - // Return the direct children of the parent - let empty_vec = Vec::new(); - let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec); - let result = direct_children - .iter() - .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) - .collect(); - - Ok(result) - } - -} +pub use sqlite::SqliteReferenceStore; #[cfg(test)] mod integration_tests; diff --git a/offline-web-storage/src/sqlite.rs b/offline-web-storage/src/sqlite.rs new file mode 100644 index 0000000..4ed1dde --- /dev/null +++ b/offline-web-storage/src/sqlite.rs @@ -0,0 +1,447 @@ +use std::sync::Arc; +use std::collections::{BTreeMap, HashMap}; + +use sqlx::{Pool, Row, Sqlite, SqlitePool}; +use offline_web_model::Reference; + +use crate::StoreError; +use crate::ReferenceStore; + +// Schema version constants +const CURRENT_SCHEMA_VERSION: i32 = 1; +const INITIAL_SCHEMA_VERSION: i32 = 0; + +pub struct SqliteReferenceStore { + pool: Pool, +} + +impl SqliteReferenceStore { + pub async fn new(database_url: &str) -> Result { + let pool = SqlitePool::connect(database_url) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + let store = Self { pool }; + + // Check current schema version and migrate if necessary + let current_version = store.get_current_schema_version().await?; + if current_version != CURRENT_SCHEMA_VERSION { + store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?; + } + + Ok(store) + } + + pub async fn get_current_schema_version(&self) -> Result { + // First, ensure the schema_version table exists + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER PRIMARY KEY, + applied_at DATETIME DEFAULT CURRENT_TIMESTAMP, + description TEXT + ) + "#, + ) + .execute(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Get the current version + let row = sqlx::query( + r#" + SELECT version FROM schema_version ORDER BY version DESC LIMIT 1 + "#, + ) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + match row { + Some(row) => { + let version: i32 = row.get("version"); + Ok(version) + } + None => { + // No version found, this is a fresh database + Ok(INITIAL_SCHEMA_VERSION) + } + } + } + + async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> { + if from_version == to_version { + return Ok(()); + } + + if from_version > to_version { + return Err(StoreError::StorageError( + "Downward migrations not currently supported".into() + )); + } + + // Use a transaction for the entire migration process + let mut tx = self.pool.begin().await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Apply migrations step by step + let mut current_version = from_version; + while current_version < to_version { + match current_version { + 0 => { + // Migration from version 0 to 1: Initial schema setup + self.migrate_to_v1(&mut tx).await?; + current_version = 1; + } + _ => { + return Err(StoreError::StorageError( + format!("Unknown migration path from version {}", current_version).into() + )); + } + } + } + + // Commit all migrations + tx.commit().await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + Ok(()) + } + + async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> { + // Create the main application tables + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS ref_entries ( + id TEXT PRIMARY KEY, + content_address TEXT, + name TEXT NOT NULL UNIQUE + ) + "#, + ) + .execute(&mut **tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS ref_dependencies ( + parent_id TEXT NOT NULL, + dependent_id TEXT NOT NULL, + PRIMARY KEY (parent_id, dependent_id), + FOREIGN KEY (parent_id) REFERENCES ref_entries(id), + FOREIGN KEY (dependent_id) REFERENCES ref_entries(id) + ) + "#, + ) + .execute(&mut **tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS content_store ( + content_address TEXT PRIMARY KEY, + content BLOB NOT NULL + ) + "#, + ) + .execute(&mut **tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Record the schema version + sqlx::query( + r#" + INSERT OR REPLACE INTO schema_version (version, description) + VALUES (?, ?) + "#, + ) + .bind(1) + .bind("Initial schema with ref_entries, ref_dependencies, and content_store tables") + .execute(&mut **tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + Ok(()) + } + + pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> { + // Use a transaction to ensure atomicity + let mut tx = self.pool.begin().await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Insert or update the reference + sqlx::query( + r#" + INSERT OR REPLACE INTO ref_entries (id, content_address, name) + VALUES (?, ?, ?) + "#, + ) + .bind(&reference.id) + .bind(&reference.content_address) + .bind(&reference.name) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Delete existing dependencies for this reference + sqlx::query( + r#" + DELETE FROM ref_dependencies + WHERE parent_id = ? + "#, + ) + .bind(&reference.id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Insert new dependencies + for dependent in &reference.dependents { + sqlx::query( + r#" + INSERT INTO ref_dependencies (parent_id, dependent_id) + VALUES (?, ?) + "#, + ) + .bind(&reference.id) + .bind(&dependent.id) + .execute(&mut *tx) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + } + + // Commit the transaction + tx.commit().await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + Ok(()) + } + + pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> { + sqlx::query( + r#" + INSERT OR REPLACE INTO content_store (content_address, content) + VALUES (?, ?) + "#, + ) + .bind(content_address) + .bind(content) + .execute(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + Ok(()) + } +} + +impl ReferenceStore for SqliteReferenceStore { + async fn get_reference(&self, id: &str) -> Result { + // First, get the basic reference information + let row = sqlx::query( + r#" + SELECT id, content_address, name + FROM ref_entries + WHERE id = ? + "#, + ) + .bind(id) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + match row { + Some(row) => { + let id: String = row.get("id"); + let content_address: Option = row.get("content_address"); + let name: String = row.get("name"); + + // Get the dependents by recursively fetching them + let dependents = self.get_dependents(&id).await?; + + Ok(Reference { + id, + content_address, + name, + dependents, + }) + } + None => Err(StoreError::NoSuchReference), + } + } + + async fn get_content_for_reference(&self, reference: Reference) -> Result { + if let Some(content_address) = &reference.content_address { + let row = sqlx::query( + r#" + SELECT content + FROM content_store + WHERE content_address = ? + "#, + ) + .bind(content_address) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + match row { + Some(row) => { + let content: Vec = row.get("content"); + String::from_utf8(content) + .map_err(|e| StoreError::StorageError(Box::new(e))) + } + None => Err(StoreError::NoSuchContentAddress), + } + } else { + Err(StoreError::NoSuchContentAddress) + } + } + + async fn get_graph(&self, root_name: &str) -> Result, StoreError> { + let mut visited = std::collections::HashSet::new(); + let mut result = Vec::new(); + let mut queue = std::collections::VecDeque::new(); + + // Start with the root name + queue.push_back(root_name.to_string()); + + while let Some(current_name) = queue.pop_front() { + if visited.contains(¤t_name) { + continue; + } + + visited.insert(current_name.clone()); + + // Get the reference by name + let row = sqlx::query( + r#" + SELECT id, content_address, name + FROM ref_entries + WHERE name = ? + "#, + ) + .bind(¤t_name) + .fetch_optional(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + if let Some(row) = row { + let id: String = row.get("id"); + let content_address: Option = row.get("content_address"); + let name: String = row.get("name"); + + // Get dependents for this reference + let dependents = self.get_dependents(&id).await?; + + let reference = Reference { + id, + content_address, + name, + dependents: dependents.clone(), + }; + + result.push(reference); + + // Add all dependent names to the queue for processing + for dependent in dependents { + if !visited.contains(&dependent.name) { + queue.push_back(dependent.name.clone()); + } + } + } + } + + Ok(result) + } +} + +impl SqliteReferenceStore { + async fn get_dependents(&self, parent_id: &str) -> Result>, StoreError> { + // Use a CTE (Common Table Expression) to get the entire dependency tree in one query + let rows = sqlx::query( + r#" + WITH RECURSIVE dependency_tree AS ( + -- Base case: direct dependents of the parent + SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth + FROM ref_entries r + JOIN ref_dependencies rd ON r.id = rd.dependent_id + WHERE rd.parent_id = ? + + UNION ALL + + -- Recursive case: dependents of dependents + SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1 + FROM ref_entries r + JOIN ref_dependencies rd ON r.id = rd.dependent_id + JOIN dependency_tree dt ON rd.parent_id = dt.id + WHERE dt.depth < 100 -- Prevent infinite recursion + ) + SELECT id, content_address, name, parent_id, depth + FROM dependency_tree + ORDER BY depth, name + "#, + ) + .bind(parent_id) + .fetch_all(&self.pool) + .await + .map_err(|e| StoreError::StorageError(Box::new(e)))?; + + // Build the dependency tree iteratively + let mut reference_map: HashMap = HashMap::new(); + let mut children_map: HashMap> = HashMap::new(); + + // First pass: create all references and build the children map + for row in &rows { + let id: String = row.get("id"); + let content_address: Option = row.get("content_address"); + let name: String = row.get("name"); + let parent_id: String = row.get("parent_id"); + + let reference = Reference { + id: id.clone(), + content_address, + name, + dependents: Vec::new(), // Will be filled in second pass + }; + + reference_map.insert(id.clone(), reference); + children_map.entry(parent_id).or_default().push(id); + } + + // Second pass: build the dependency tree from bottom up (highest depth first) + let mut depth_groups: BTreeMap> = BTreeMap::new(); + for row in &rows { + let id: String = row.get("id"); + let depth: i32 = row.get("depth"); + depth_groups.entry(depth).or_default().push(id); + } + + // Process from highest depth to lowest (leaves to roots) + for (_depth, ids) in depth_groups.iter().rev() { + for id in ids { + if let Some(children) = children_map.get(id).cloned() { + let child_references: Vec> = children + .iter() + .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) + .collect(); + + if let Some(reference) = reference_map.get_mut(id) { + reference.dependents = child_references; + } + } + } + } + + // Return the direct children of the parent + let empty_vec = Vec::new(); + let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec); + let result = direct_children + .iter() + .filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone()))) + .collect(); + + Ok(result) + } +} +