wip: move the sqlite implementation into its own module
This commit is contained in:
parent
a6e501f3e5
commit
def2eec18a
@ -1,17 +1,5 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
use std::collections::{BTreeMap, HashMap};
|
|
||||||
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use offline_web_model::Reference;
|
use offline_web_model::Reference;
|
||||||
use sqlx::{Pool, Row, Sqlite, SqlitePool};
|
|
||||||
|
|
||||||
// Schema version constants
|
|
||||||
const CURRENT_SCHEMA_VERSION: i32 = 1;
|
|
||||||
const INITIAL_SCHEMA_VERSION: i32 = 0;
|
|
||||||
|
|
||||||
pub struct SqliteReferenceStore {
|
|
||||||
pool: Pool<Sqlite>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum StoreError {
|
pub enum StoreError {
|
||||||
@ -32,436 +20,9 @@ pub trait ReferenceStore {
|
|||||||
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError>;
|
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SqliteReferenceStore {
|
mod sqlite;
|
||||||
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
|
|
||||||
let pool = SqlitePool::connect(database_url)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
let store = Self { pool };
|
pub use sqlite::SqliteReferenceStore;
|
||||||
|
|
||||||
// Check current schema version and migrate if necessary
|
|
||||||
let current_version = store.get_current_schema_version().await?;
|
|
||||||
if current_version != CURRENT_SCHEMA_VERSION {
|
|
||||||
store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(store)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_current_schema_version(&self) -> Result<i32, StoreError> {
|
|
||||||
// First, ensure the schema_version table exists
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
CREATE TABLE IF NOT EXISTS schema_version (
|
|
||||||
version INTEGER PRIMARY KEY,
|
|
||||||
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
description TEXT
|
|
||||||
)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.execute(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Get the current version
|
|
||||||
let row = sqlx::query(
|
|
||||||
r#"
|
|
||||||
SELECT version FROM schema_version ORDER BY version DESC LIMIT 1
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.fetch_optional(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
match row {
|
|
||||||
Some(row) => {
|
|
||||||
let version: i32 = row.get("version");
|
|
||||||
Ok(version)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// No version found, this is a fresh database
|
|
||||||
Ok(INITIAL_SCHEMA_VERSION)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> {
|
|
||||||
if from_version == to_version {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if from_version > to_version {
|
|
||||||
return Err(StoreError::StorageError(
|
|
||||||
"Downward migrations not currently supported".into()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use a transaction for the entire migration process
|
|
||||||
let mut tx = self.pool.begin().await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Apply migrations step by step
|
|
||||||
let mut current_version = from_version;
|
|
||||||
while current_version < to_version {
|
|
||||||
match current_version {
|
|
||||||
0 => {
|
|
||||||
// Migration from version 0 to 1: Initial schema setup
|
|
||||||
self.migrate_to_v1(&mut tx).await?;
|
|
||||||
current_version = 1;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
return Err(StoreError::StorageError(
|
|
||||||
format!("Unknown migration path from version {}", current_version).into()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Commit all migrations
|
|
||||||
tx.commit().await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> {
|
|
||||||
// Create the main application tables
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
CREATE TABLE IF NOT EXISTS ref_entries (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
content_address TEXT,
|
|
||||||
name TEXT NOT NULL UNIQUE
|
|
||||||
)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.execute(&mut **tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
CREATE TABLE IF NOT EXISTS ref_dependencies (
|
|
||||||
parent_id TEXT NOT NULL,
|
|
||||||
dependent_id TEXT NOT NULL,
|
|
||||||
PRIMARY KEY (parent_id, dependent_id),
|
|
||||||
FOREIGN KEY (parent_id) REFERENCES ref_entries(id),
|
|
||||||
FOREIGN KEY (dependent_id) REFERENCES ref_entries(id)
|
|
||||||
)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.execute(&mut **tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
CREATE TABLE IF NOT EXISTS content_store (
|
|
||||||
content_address TEXT PRIMARY KEY,
|
|
||||||
content BLOB NOT NULL
|
|
||||||
)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.execute(&mut **tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Record the schema version
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT OR REPLACE INTO schema_version (version, description)
|
|
||||||
VALUES (?, ?)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(1)
|
|
||||||
.bind("Initial schema with ref_entries, ref_dependencies, and content_store tables")
|
|
||||||
.execute(&mut **tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> {
|
|
||||||
// Use a transaction to ensure atomicity
|
|
||||||
let mut tx = self.pool.begin().await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Insert or update the reference
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT OR REPLACE INTO ref_entries (id, content_address, name)
|
|
||||||
VALUES (?, ?, ?)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(&reference.id)
|
|
||||||
.bind(&reference.content_address)
|
|
||||||
.bind(&reference.name)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Delete existing dependencies for this reference
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
DELETE FROM ref_dependencies
|
|
||||||
WHERE parent_id = ?
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(&reference.id)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Insert new dependencies
|
|
||||||
for dependent in &reference.dependents {
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT INTO ref_dependencies (parent_id, dependent_id)
|
|
||||||
VALUES (?, ?)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(&reference.id)
|
|
||||||
.bind(&dependent.id)
|
|
||||||
.execute(&mut *tx)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Commit the transaction
|
|
||||||
tx.commit().await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> {
|
|
||||||
sqlx::query(
|
|
||||||
r#"
|
|
||||||
INSERT OR REPLACE INTO content_store (content_address, content)
|
|
||||||
VALUES (?, ?)
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(content_address)
|
|
||||||
.bind(content)
|
|
||||||
.execute(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ReferenceStore for SqliteReferenceStore {
|
|
||||||
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError> {
|
|
||||||
// First, get the basic reference information
|
|
||||||
let row = sqlx::query(
|
|
||||||
r#"
|
|
||||||
SELECT id, content_address, name
|
|
||||||
FROM ref_entries
|
|
||||||
WHERE id = ?
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(id)
|
|
||||||
.fetch_optional(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
match row {
|
|
||||||
Some(row) => {
|
|
||||||
let id: String = row.get("id");
|
|
||||||
let content_address: Option<String> = row.get("content_address");
|
|
||||||
let name: String = row.get("name");
|
|
||||||
|
|
||||||
// Get the dependents by recursively fetching them
|
|
||||||
let dependents = self.get_dependents(&id).await?;
|
|
||||||
|
|
||||||
Ok(Reference {
|
|
||||||
id,
|
|
||||||
content_address,
|
|
||||||
name,
|
|
||||||
dependents,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
None => Err(StoreError::NoSuchReference),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError> {
|
|
||||||
if let Some(content_address) = &reference.content_address {
|
|
||||||
let row = sqlx::query(
|
|
||||||
r#"
|
|
||||||
SELECT content
|
|
||||||
FROM content_store
|
|
||||||
WHERE content_address = ?
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(content_address)
|
|
||||||
.fetch_optional(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
match row {
|
|
||||||
Some(row) => {
|
|
||||||
let content: Vec<u8> = row.get("content");
|
|
||||||
String::from_utf8(content)
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))
|
|
||||||
}
|
|
||||||
None => Err(StoreError::NoSuchContentAddress),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(StoreError::NoSuchContentAddress)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError> {
|
|
||||||
let mut visited = std::collections::HashSet::new();
|
|
||||||
let mut result = Vec::new();
|
|
||||||
let mut queue = std::collections::VecDeque::new();
|
|
||||||
|
|
||||||
// Start with the root name
|
|
||||||
queue.push_back(root_name.to_string());
|
|
||||||
|
|
||||||
while let Some(current_name) = queue.pop_front() {
|
|
||||||
if visited.contains(¤t_name) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
visited.insert(current_name.clone());
|
|
||||||
|
|
||||||
// Get the reference by name
|
|
||||||
let row = sqlx::query(
|
|
||||||
r#"
|
|
||||||
SELECT id, content_address, name
|
|
||||||
FROM ref_entries
|
|
||||||
WHERE name = ?
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(¤t_name)
|
|
||||||
.fetch_optional(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
if let Some(row) = row {
|
|
||||||
let id: String = row.get("id");
|
|
||||||
let content_address: Option<String> = row.get("content_address");
|
|
||||||
let name: String = row.get("name");
|
|
||||||
|
|
||||||
// Get dependents for this reference
|
|
||||||
let dependents = self.get_dependents(&id).await?;
|
|
||||||
|
|
||||||
let reference = Reference {
|
|
||||||
id,
|
|
||||||
content_address,
|
|
||||||
name,
|
|
||||||
dependents: dependents.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
result.push(reference);
|
|
||||||
|
|
||||||
// Add all dependent names to the queue for processing
|
|
||||||
for dependent in dependents {
|
|
||||||
if !visited.contains(&dependent.name) {
|
|
||||||
queue.push_back(dependent.name.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SqliteReferenceStore {
|
|
||||||
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>, StoreError> {
|
|
||||||
// Use a CTE (Common Table Expression) to get the entire dependency tree in one query
|
|
||||||
let rows = sqlx::query(
|
|
||||||
r#"
|
|
||||||
WITH RECURSIVE dependency_tree AS (
|
|
||||||
-- Base case: direct dependents of the parent
|
|
||||||
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth
|
|
||||||
FROM ref_entries r
|
|
||||||
JOIN ref_dependencies rd ON r.id = rd.dependent_id
|
|
||||||
WHERE rd.parent_id = ?
|
|
||||||
|
|
||||||
UNION ALL
|
|
||||||
|
|
||||||
-- Recursive case: dependents of dependents
|
|
||||||
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1
|
|
||||||
FROM ref_entries r
|
|
||||||
JOIN ref_dependencies rd ON r.id = rd.dependent_id
|
|
||||||
JOIN dependency_tree dt ON rd.parent_id = dt.id
|
|
||||||
WHERE dt.depth < 100 -- Prevent infinite recursion
|
|
||||||
)
|
|
||||||
SELECT id, content_address, name, parent_id, depth
|
|
||||||
FROM dependency_tree
|
|
||||||
ORDER BY depth, name
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.bind(parent_id)
|
|
||||||
.fetch_all(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
|
||||||
|
|
||||||
// Build the dependency tree iteratively
|
|
||||||
let mut reference_map: HashMap<String, Reference> = HashMap::new();
|
|
||||||
let mut children_map: HashMap<String, Vec<String>> = HashMap::new();
|
|
||||||
|
|
||||||
// First pass: create all references and build the children map
|
|
||||||
for row in &rows {
|
|
||||||
let id: String = row.get("id");
|
|
||||||
let content_address: Option<String> = row.get("content_address");
|
|
||||||
let name: String = row.get("name");
|
|
||||||
let parent_id: String = row.get("parent_id");
|
|
||||||
|
|
||||||
let reference = Reference {
|
|
||||||
id: id.clone(),
|
|
||||||
content_address,
|
|
||||||
name,
|
|
||||||
dependents: Vec::new(), // Will be filled in second pass
|
|
||||||
};
|
|
||||||
|
|
||||||
reference_map.insert(id.clone(), reference);
|
|
||||||
children_map.entry(parent_id).or_default().push(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second pass: build the dependency tree from bottom up (highest depth first)
|
|
||||||
let mut depth_groups: BTreeMap<i32, Vec<String>> = BTreeMap::new();
|
|
||||||
for row in &rows {
|
|
||||||
let id: String = row.get("id");
|
|
||||||
let depth: i32 = row.get("depth");
|
|
||||||
depth_groups.entry(depth).or_default().push(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process from highest depth to lowest (leaves to roots)
|
|
||||||
for (_depth, ids) in depth_groups.iter().rev() {
|
|
||||||
for id in ids {
|
|
||||||
if let Some(children) = children_map.get(id).cloned() {
|
|
||||||
let child_references: Vec<Arc<Reference>> = children
|
|
||||||
.iter()
|
|
||||||
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if let Some(reference) = reference_map.get_mut(id) {
|
|
||||||
reference.dependents = child_references;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the direct children of the parent
|
|
||||||
let empty_vec = Vec::new();
|
|
||||||
let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec);
|
|
||||||
let result = direct_children
|
|
||||||
.iter()
|
|
||||||
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod integration_tests;
|
mod integration_tests;
|
||||||
|
447
offline-web-storage/src/sqlite.rs
Normal file
447
offline-web-storage/src/sqlite.rs
Normal file
@ -0,0 +1,447 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
|
||||||
|
use sqlx::{Pool, Row, Sqlite, SqlitePool};
|
||||||
|
use offline_web_model::Reference;
|
||||||
|
|
||||||
|
use crate::StoreError;
|
||||||
|
use crate::ReferenceStore;
|
||||||
|
|
||||||
|
// Schema version constants
|
||||||
|
const CURRENT_SCHEMA_VERSION: i32 = 1;
|
||||||
|
const INITIAL_SCHEMA_VERSION: i32 = 0;
|
||||||
|
|
||||||
|
pub struct SqliteReferenceStore {
|
||||||
|
pool: Pool<Sqlite>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteReferenceStore {
|
||||||
|
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
|
||||||
|
let pool = SqlitePool::connect(database_url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
let store = Self { pool };
|
||||||
|
|
||||||
|
// Check current schema version and migrate if necessary
|
||||||
|
let current_version = store.get_current_schema_version().await?;
|
||||||
|
if current_version != CURRENT_SCHEMA_VERSION {
|
||||||
|
store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(store)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_current_schema_version(&self) -> Result<i32, StoreError> {
|
||||||
|
// First, ensure the schema_version table exists
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
CREATE TABLE IF NOT EXISTS schema_version (
|
||||||
|
version INTEGER PRIMARY KEY,
|
||||||
|
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
description TEXT
|
||||||
|
)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Get the current version
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT version FROM schema_version ORDER BY version DESC LIMIT 1
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(row) => {
|
||||||
|
let version: i32 = row.get("version");
|
||||||
|
Ok(version)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// No version found, this is a fresh database
|
||||||
|
Ok(INITIAL_SCHEMA_VERSION)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> {
|
||||||
|
if from_version == to_version {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if from_version > to_version {
|
||||||
|
return Err(StoreError::StorageError(
|
||||||
|
"Downward migrations not currently supported".into()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a transaction for the entire migration process
|
||||||
|
let mut tx = self.pool.begin().await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Apply migrations step by step
|
||||||
|
let mut current_version = from_version;
|
||||||
|
while current_version < to_version {
|
||||||
|
match current_version {
|
||||||
|
0 => {
|
||||||
|
// Migration from version 0 to 1: Initial schema setup
|
||||||
|
self.migrate_to_v1(&mut tx).await?;
|
||||||
|
current_version = 1;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err(StoreError::StorageError(
|
||||||
|
format!("Unknown migration path from version {}", current_version).into()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit all migrations
|
||||||
|
tx.commit().await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> {
|
||||||
|
// Create the main application tables
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
CREATE TABLE IF NOT EXISTS ref_entries (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
content_address TEXT,
|
||||||
|
name TEXT NOT NULL UNIQUE
|
||||||
|
)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.execute(&mut **tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
CREATE TABLE IF NOT EXISTS ref_dependencies (
|
||||||
|
parent_id TEXT NOT NULL,
|
||||||
|
dependent_id TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (parent_id, dependent_id),
|
||||||
|
FOREIGN KEY (parent_id) REFERENCES ref_entries(id),
|
||||||
|
FOREIGN KEY (dependent_id) REFERENCES ref_entries(id)
|
||||||
|
)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.execute(&mut **tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
CREATE TABLE IF NOT EXISTS content_store (
|
||||||
|
content_address TEXT PRIMARY KEY,
|
||||||
|
content BLOB NOT NULL
|
||||||
|
)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.execute(&mut **tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Record the schema version
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
INSERT OR REPLACE INTO schema_version (version, description)
|
||||||
|
VALUES (?, ?)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(1)
|
||||||
|
.bind("Initial schema with ref_entries, ref_dependencies, and content_store tables")
|
||||||
|
.execute(&mut **tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> {
|
||||||
|
// Use a transaction to ensure atomicity
|
||||||
|
let mut tx = self.pool.begin().await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Insert or update the reference
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
INSERT OR REPLACE INTO ref_entries (id, content_address, name)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(&reference.id)
|
||||||
|
.bind(&reference.content_address)
|
||||||
|
.bind(&reference.name)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Delete existing dependencies for this reference
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
DELETE FROM ref_dependencies
|
||||||
|
WHERE parent_id = ?
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(&reference.id)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Insert new dependencies
|
||||||
|
for dependent in &reference.dependents {
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
INSERT INTO ref_dependencies (parent_id, dependent_id)
|
||||||
|
VALUES (?, ?)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(&reference.id)
|
||||||
|
.bind(&dependent.id)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the transaction
|
||||||
|
tx.commit().await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> {
|
||||||
|
sqlx::query(
|
||||||
|
r#"
|
||||||
|
INSERT OR REPLACE INTO content_store (content_address, content)
|
||||||
|
VALUES (?, ?)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(content_address)
|
||||||
|
.bind(content)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReferenceStore for SqliteReferenceStore {
|
||||||
|
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError> {
|
||||||
|
// First, get the basic reference information
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT id, content_address, name
|
||||||
|
FROM ref_entries
|
||||||
|
WHERE id = ?
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(row) => {
|
||||||
|
let id: String = row.get("id");
|
||||||
|
let content_address: Option<String> = row.get("content_address");
|
||||||
|
let name: String = row.get("name");
|
||||||
|
|
||||||
|
// Get the dependents by recursively fetching them
|
||||||
|
let dependents = self.get_dependents(&id).await?;
|
||||||
|
|
||||||
|
Ok(Reference {
|
||||||
|
id,
|
||||||
|
content_address,
|
||||||
|
name,
|
||||||
|
dependents,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
None => Err(StoreError::NoSuchReference),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError> {
|
||||||
|
if let Some(content_address) = &reference.content_address {
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT content
|
||||||
|
FROM content_store
|
||||||
|
WHERE content_address = ?
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(content_address)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Some(row) => {
|
||||||
|
let content: Vec<u8> = row.get("content");
|
||||||
|
String::from_utf8(content)
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))
|
||||||
|
}
|
||||||
|
None => Err(StoreError::NoSuchContentAddress),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Err(StoreError::NoSuchContentAddress)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError> {
|
||||||
|
let mut visited = std::collections::HashSet::new();
|
||||||
|
let mut result = Vec::new();
|
||||||
|
let mut queue = std::collections::VecDeque::new();
|
||||||
|
|
||||||
|
// Start with the root name
|
||||||
|
queue.push_back(root_name.to_string());
|
||||||
|
|
||||||
|
while let Some(current_name) = queue.pop_front() {
|
||||||
|
if visited.contains(¤t_name) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
visited.insert(current_name.clone());
|
||||||
|
|
||||||
|
// Get the reference by name
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT id, content_address, name
|
||||||
|
FROM ref_entries
|
||||||
|
WHERE name = ?
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(¤t_name)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
if let Some(row) = row {
|
||||||
|
let id: String = row.get("id");
|
||||||
|
let content_address: Option<String> = row.get("content_address");
|
||||||
|
let name: String = row.get("name");
|
||||||
|
|
||||||
|
// Get dependents for this reference
|
||||||
|
let dependents = self.get_dependents(&id).await?;
|
||||||
|
|
||||||
|
let reference = Reference {
|
||||||
|
id,
|
||||||
|
content_address,
|
||||||
|
name,
|
||||||
|
dependents: dependents.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
result.push(reference);
|
||||||
|
|
||||||
|
// Add all dependent names to the queue for processing
|
||||||
|
for dependent in dependents {
|
||||||
|
if !visited.contains(&dependent.name) {
|
||||||
|
queue.push_back(dependent.name.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteReferenceStore {
|
||||||
|
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>, StoreError> {
|
||||||
|
// Use a CTE (Common Table Expression) to get the entire dependency tree in one query
|
||||||
|
let rows = sqlx::query(
|
||||||
|
r#"
|
||||||
|
WITH RECURSIVE dependency_tree AS (
|
||||||
|
-- Base case: direct dependents of the parent
|
||||||
|
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth
|
||||||
|
FROM ref_entries r
|
||||||
|
JOIN ref_dependencies rd ON r.id = rd.dependent_id
|
||||||
|
WHERE rd.parent_id = ?
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
-- Recursive case: dependents of dependents
|
||||||
|
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1
|
||||||
|
FROM ref_entries r
|
||||||
|
JOIN ref_dependencies rd ON r.id = rd.dependent_id
|
||||||
|
JOIN dependency_tree dt ON rd.parent_id = dt.id
|
||||||
|
WHERE dt.depth < 100 -- Prevent infinite recursion
|
||||||
|
)
|
||||||
|
SELECT id, content_address, name, parent_id, depth
|
||||||
|
FROM dependency_tree
|
||||||
|
ORDER BY depth, name
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(parent_id)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
|
||||||
|
|
||||||
|
// Build the dependency tree iteratively
|
||||||
|
let mut reference_map: HashMap<String, Reference> = HashMap::new();
|
||||||
|
let mut children_map: HashMap<String, Vec<String>> = HashMap::new();
|
||||||
|
|
||||||
|
// First pass: create all references and build the children map
|
||||||
|
for row in &rows {
|
||||||
|
let id: String = row.get("id");
|
||||||
|
let content_address: Option<String> = row.get("content_address");
|
||||||
|
let name: String = row.get("name");
|
||||||
|
let parent_id: String = row.get("parent_id");
|
||||||
|
|
||||||
|
let reference = Reference {
|
||||||
|
id: id.clone(),
|
||||||
|
content_address,
|
||||||
|
name,
|
||||||
|
dependents: Vec::new(), // Will be filled in second pass
|
||||||
|
};
|
||||||
|
|
||||||
|
reference_map.insert(id.clone(), reference);
|
||||||
|
children_map.entry(parent_id).or_default().push(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second pass: build the dependency tree from bottom up (highest depth first)
|
||||||
|
let mut depth_groups: BTreeMap<i32, Vec<String>> = BTreeMap::new();
|
||||||
|
for row in &rows {
|
||||||
|
let id: String = row.get("id");
|
||||||
|
let depth: i32 = row.get("depth");
|
||||||
|
depth_groups.entry(depth).or_default().push(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process from highest depth to lowest (leaves to roots)
|
||||||
|
for (_depth, ids) in depth_groups.iter().rev() {
|
||||||
|
for id in ids {
|
||||||
|
if let Some(children) = children_map.get(id).cloned() {
|
||||||
|
let child_references: Vec<Arc<Reference>> = children
|
||||||
|
.iter()
|
||||||
|
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if let Some(reference) = reference_map.get_mut(id) {
|
||||||
|
reference.dependents = child_references;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the direct children of the parent
|
||||||
|
let empty_vec = Vec::new();
|
||||||
|
let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec);
|
||||||
|
let result = direct_children
|
||||||
|
.iter()
|
||||||
|
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user