362 lines
12 KiB
Rust
Raw Normal View History

2025-07-03 16:58:56 -05:00
use std::sync::Arc;
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
use thiserror::Error;
2025-07-02 15:31:24 -05:00
use offline_web_model::Reference;
use sqlx::{Pool, Row, Sqlite, SqlitePool};
2025-07-03 16:58:56 -05:00
pub struct SqliteReferenceStore {
2025-07-02 15:31:24 -05:00
pool: Pool<Sqlite>,
}
2025-07-03 16:58:56 -05:00
#[derive(Error, Debug)]
pub enum StoreError {
#[error("No such reference")]
NoSuchReference,
#[error("No such content address")]
NoSuchContentAddress,
#[error("Unknown Storage Error: {0:?}")]
StorageError(Box<dyn std::error::Error>),
}
#[allow(async_fn_in_trait)]
pub trait ReferenceStore {
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError>;
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError>;
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError>;
}
impl SqliteReferenceStore {
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
let pool = SqlitePool::connect(database_url)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Create tables if they don't exist
2025-07-02 15:31:24 -05:00
sqlx::query(
r#"
2025-07-03 16:58:56 -05:00
CREATE TABLE IF NOT EXISTS ref_entries (
2025-07-02 15:31:24 -05:00
id TEXT PRIMARY KEY,
content_address TEXT,
2025-07-03 16:58:56 -05:00
name TEXT NOT NULL UNIQUE
2025-07-02 15:31:24 -05:00
)
"#,
)
2025-07-03 16:58:56 -05:00
.execute(&pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
sqlx::query(
r#"
2025-07-03 16:58:56 -05:00
CREATE TABLE IF NOT EXISTS ref_dependencies (
2025-07-02 15:31:24 -05:00
parent_id TEXT NOT NULL,
dependent_id TEXT NOT NULL,
2025-07-03 16:58:56 -05:00
PRIMARY KEY (parent_id, dependent_id),
FOREIGN KEY (parent_id) REFERENCES ref_entries(id),
FOREIGN KEY (dependent_id) REFERENCES ref_entries(id)
2025-07-02 15:31:24 -05:00
)
"#,
)
2025-07-03 16:58:56 -05:00
.execute(&pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
sqlx::query(
r#"
2025-07-03 16:58:56 -05:00
CREATE TABLE IF NOT EXISTS content_store (
2025-07-02 15:31:24 -05:00
content_address TEXT PRIMARY KEY,
2025-07-03 16:58:56 -05:00
content BLOB NOT NULL
2025-07-02 15:31:24 -05:00
)
"#,
)
2025-07-03 16:58:56 -05:00
.execute(&pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
Ok(Self { pool })
2025-07-02 15:31:24 -05:00
}
2025-07-03 16:58:56 -05:00
pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> {
// Use a transaction to ensure atomicity
let mut tx = self.pool.begin().await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
// Insert or update the reference
sqlx::query(
r#"
2025-07-03 16:58:56 -05:00
INSERT OR REPLACE INTO ref_entries (id, content_address, name)
VALUES (?, ?, ?)
2025-07-02 15:31:24 -05:00
"#,
)
.bind(&reference.id)
.bind(&reference.content_address)
.bind(&reference.name)
.execute(&mut *tx)
2025-07-03 16:58:56 -05:00
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
// Delete existing dependencies for this reference
sqlx::query(
r#"
DELETE FROM ref_dependencies
WHERE parent_id = ?
"#,
)
.bind(&reference.id)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
// Insert new dependencies
for dependent in &reference.dependents {
sqlx::query(
2025-07-03 16:58:56 -05:00
r#"
INSERT INTO ref_dependencies (parent_id, dependent_id)
VALUES (?, ?)
"#,
2025-07-02 15:31:24 -05:00
)
.bind(&reference.id)
.bind(&dependent.id)
.execute(&mut *tx)
2025-07-03 16:58:56 -05:00
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
}
2025-07-03 16:58:56 -05:00
// Commit the transaction
tx.commit().await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
Ok(())
}
2025-07-03 16:58:56 -05:00
pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> {
sqlx::query(
r#"
INSERT OR REPLACE INTO content_store (content_address, content)
VALUES (?, ?)
"#,
)
.bind(content_address)
.bind(content)
.execute(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
Ok(())
}
}
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
impl ReferenceStore for SqliteReferenceStore {
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError> {
// First, get the basic reference information
let row = sqlx::query(
r#"
SELECT id, content_address, name
FROM ref_entries
WHERE id = ?
"#,
)
.bind(id)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
match row {
Some(row) => {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Get the dependents by recursively fetching them
let dependents = self.get_dependents(&id).await?;
Ok(Reference {
id,
content_address,
name,
dependents,
})
}
None => Err(StoreError::NoSuchReference),
}
}
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError> {
if let Some(content_address) = &reference.content_address {
let row = sqlx::query(
r#"
SELECT content
FROM content_store
WHERE content_address = ?
"#,
)
.bind(content_address)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
match row {
Some(row) => {
let content: Vec<u8> = row.get("content");
String::from_utf8(content)
.map_err(|e| StoreError::StorageError(Box::new(e)))
}
None => Err(StoreError::NoSuchContentAddress),
}
} else {
Err(StoreError::NoSuchContentAddress)
}
}
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError> {
let mut visited = std::collections::HashSet::new();
let mut result = Vec::new();
let mut queue = std::collections::VecDeque::new();
// Start with the root name
queue.push_back(root_name.to_string());
while let Some(current_name) = queue.pop_front() {
if visited.contains(&current_name) {
continue;
}
visited.insert(current_name.clone());
// Get the reference by name
let row = sqlx::query(
r#"
SELECT id, content_address, name
FROM ref_entries
WHERE name = ?
"#,
)
.bind(&current_name)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
if let Some(row) = row {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Get dependents for this reference
let dependents = self.get_dependents(&id).await?;
let reference = Reference {
id,
content_address,
name,
dependents: dependents.clone(),
};
result.push(reference);
// Add all dependent names to the queue for processing
for dependent in dependents {
if !visited.contains(&dependent.name) {
queue.push_back(dependent.name.clone());
}
}
}
}
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
Ok(result)
2025-07-02 15:31:24 -05:00
}
2025-07-03 16:58:56 -05:00
}
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
impl SqliteReferenceStore {
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>, StoreError> {
// Use a CTE (Common Table Expression) to get the entire dependency tree in one query
let rows = sqlx::query(
r#"
2025-07-03 16:58:56 -05:00
WITH RECURSIVE dependency_tree AS (
-- Base case: direct dependents of the parent
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth
FROM ref_entries r
JOIN ref_dependencies rd ON r.id = rd.dependent_id
2025-07-02 15:31:24 -05:00
WHERE rd.parent_id = ?
UNION ALL
-- Recursive case: dependents of dependents
2025-07-03 16:58:56 -05:00
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1
FROM ref_entries r
JOIN ref_dependencies rd ON r.id = rd.dependent_id
JOIN dependency_tree dt ON rd.parent_id = dt.id
WHERE dt.depth < 100 -- Prevent infinite recursion
2025-07-02 15:31:24 -05:00
)
2025-07-03 16:58:56 -05:00
SELECT id, content_address, name, parent_id, depth
FROM dependency_tree
ORDER BY depth, name
"#,
)
.bind(parent_id)
.fetch_all(&self.pool)
2025-07-03 16:58:56 -05:00
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
// Build the dependency tree iteratively
let mut reference_map: std::collections::HashMap<String, Reference> = std::collections::HashMap::new();
let mut children_map: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
2025-07-03 16:58:56 -05:00
// First pass: create all references and build the children map
for row in &rows {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
let parent_id: String = row.get("parent_id");
2025-07-03 16:58:56 -05:00
let reference = Reference {
id: id.clone(),
content_address,
name,
2025-07-03 16:58:56 -05:00
dependents: Vec::new(), // Will be filled in second pass
};
2025-07-03 16:58:56 -05:00
reference_map.insert(id.clone(), reference);
children_map.entry(parent_id).or_default().push(id);
}
2025-07-03 16:58:56 -05:00
// Second pass: build the dependency tree from bottom up (highest depth first)
let mut depth_groups: std::collections::BTreeMap<i32, Vec<String>> = std::collections::BTreeMap::new();
for row in &rows {
2025-07-02 15:31:24 -05:00
let id: String = row.get("id");
2025-07-03 16:58:56 -05:00
let depth: i32 = row.get("depth");
depth_groups.entry(depth).or_default().push(id);
2025-07-02 15:31:24 -05:00
}
2025-07-03 16:58:56 -05:00
// Process from highest depth to lowest (leaves to roots)
for (_depth, ids) in depth_groups.iter().rev() {
for id in ids {
if let Some(children) = children_map.get(id).cloned() {
let child_references: Vec<Arc<Reference>> = children
.iter()
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
.collect();
if let Some(reference) = reference_map.get_mut(id) {
reference.dependents = child_references;
}
}
2025-07-02 15:31:24 -05:00
}
}
2025-07-03 16:58:56 -05:00
// Return the direct children of the parent
let empty_vec = Vec::new();
let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec);
let result = direct_children
.iter()
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
.collect();
2025-07-02 15:31:24 -05:00
2025-07-03 16:58:56 -05:00
Ok(result)
2025-07-02 15:31:24 -05:00
}
}
2025-07-03 16:58:56 -05:00
#[cfg(test)]
mod integration_tests;