474 lines
14 KiB
Rust

use std::{collections::HashMap, sync::Arc};
use anyhow::Result;
use blake2::{Blake2b512, Digest};
use offline_web_model::Reference;
use sqlx::{Pool, Row, Sqlite, SqlitePool};
pub struct ReferenceStore {
pool: Pool<Sqlite>,
}
impl ReferenceStore {
pub async fn new(database_url: &str) -> Result<Self> {
let pool = SqlitePool::connect(database_url).await?;
let store = Self { pool };
store.initialize_schema().await?;
Ok(store)
}
async fn initialize_schema(&self) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS refs (
id TEXT PRIMARY KEY,
content_address TEXT,
name TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS reference_dependencies (
parent_id TEXT NOT NULL,
dependent_id TEXT NOT NULL,
PRIMARY KEY (parent_id, dependent_id)
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_name ON refs(name)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_refs_content_address ON refs(content_address)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_parent ON reference_dependencies(parent_id)")
.execute(&self.pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_dependencies_dependent ON reference_dependencies(dependent_id)")
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS content_objects (
content_address TEXT PRIMARY KEY,
content_data BLOB NOT NULL,
content_type TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_content_created ON content_objects(created_at)",
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn store_reference(&self, reference: &Reference) -> Result<()> {
let mut tx = self.pool.begin().await?;
// Insert or update the reference
sqlx::query(
r#"
INSERT OR REPLACE INTO refs (id, content_address, name, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
"#,
)
.bind(&reference.id)
.bind(&reference.content_address)
.bind(&reference.name)
.execute(&mut *tx)
.await?;
// Clear existing dependencies for this reference
sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?")
.bind(&reference.id)
.execute(&mut *tx)
.await?;
// Insert new dependencies
for dependent in &reference.dependents {
sqlx::query(
"INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)",
)
.bind(&reference.id)
.bind(&dependent.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn get_reference(&self, id: &str) -> Result<Option<Reference>> {
// Get the reference record
let row = sqlx::query("SELECT id, content_address, name FROM refs WHERE id = ?")
.bind(id)
.fetch_optional(&self.pool)
.await?;
let Some(row) = row else {
return Ok(None);
};
let reference_id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Get dependencies
let dependents = self.get_dependents(&reference_id).await?;
Ok(Some(Reference {
id: reference_id,
content_address,
name,
dependents,
}))
}
fn get_dependents<'a>(
&'a self,
parent_id: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Arc<Reference>>>> + Send + 'a>>
{
Box::pin(async move {
let rows = sqlx::query(
r#"
SELECT r.id, r.content_address, r.name
FROM refs r
JOIN reference_dependencies rd ON r.id = rd.dependent_id
WHERE rd.parent_id = ?
ORDER BY r.name
"#,
)
.bind(parent_id)
.fetch_all(&self.pool)
.await?;
let mut dependents = Vec::new();
for row in rows {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Recursively get dependents for each dependent
let nested_dependents = self.get_dependents(&id).await?;
dependents.push(Arc::new(Reference {
id,
content_address,
name,
dependents: nested_dependents,
}));
}
Ok(dependents)
})
}
pub async fn get_references_by_name(&self, name: &str) -> Result<Vec<Reference>> {
let rows = sqlx::query("SELECT id FROM refs WHERE name = ?")
.bind(name)
.fetch_all(&self.pool)
.await?;
let mut references = Vec::new();
for row in rows {
let id: String = row.get("id");
if let Some(reference) = self.get_reference(&id).await? {
references.push(reference);
}
}
Ok(references)
}
pub async fn get_references_by_content_address(
&self,
content_address: &str,
) -> Result<Vec<Reference>> {
let rows = sqlx::query("SELECT id FROM refs WHERE content_address = ?")
.bind(content_address)
.fetch_all(&self.pool)
.await?;
let mut references = Vec::new();
for row in rows {
let id: String = row.get("id");
if let Some(reference) = self.get_reference(&id).await? {
references.push(reference);
}
}
Ok(references)
}
pub async fn delete_reference(&self, id: &str) -> Result<bool> {
let result = sqlx::query("DELETE FROM refs WHERE id = ?")
.bind(id)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn list_all_references(&self) -> Result<Vec<Reference>> {
let rows = sqlx::query("SELECT id FROM refs ORDER BY name")
.fetch_all(&self.pool)
.await?;
let mut references = Vec::new();
for row in rows {
let id: String = row.get("id");
if let Some(reference) = self.get_reference(&id).await? {
references.push(reference);
}
}
Ok(references)
}
pub async fn update_reference_graph(
&self,
updated_references: &HashMap<String, Arc<Reference>>,
) -> Result<()> {
let mut tx = self.pool.begin().await?;
for (_, reference) in updated_references {
// Update the reference
sqlx::query(
r#"
INSERT OR REPLACE INTO refs (id, content_address, name, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
"#,
)
.bind(&reference.id)
.bind(&reference.content_address)
.bind(&reference.name)
.execute(&mut *tx)
.await?;
// Clear existing dependencies
sqlx::query("DELETE FROM reference_dependencies WHERE parent_id = ?")
.bind(&reference.id)
.execute(&mut *tx)
.await?;
// Insert new dependencies
for dependent in &reference.dependents {
sqlx::query(
"INSERT INTO reference_dependencies (parent_id, dependent_id) VALUES (?, ?)",
)
.bind(&reference.id)
.bind(&dependent.id)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
Ok(())
}
pub fn calculate_content_address(content: &[u8]) -> String {
let mut hasher = Blake2b512::new();
hasher.update(content);
format!("{:x}", hasher.finalize())
}
pub async fn store_content(
&self,
content: &[u8],
content_type: Option<String>,
) -> Result<String> {
let content_address = Self::calculate_content_address(content);
// Check if content already exists (deduplication)
let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?")
.bind(&content_address)
.fetch_optional(&self.pool)
.await?
.is_some();
if !exists {
sqlx::query(
r#"
INSERT INTO content_objects (content_address, content_data, content_type)
VALUES (?, ?, ?)
"#,
)
.bind(&content_address)
.bind(content)
.bind(&content_type)
.execute(&self.pool)
.await?;
}
Ok(content_address)
}
pub async fn get_content(&self, content_address: &str) -> Result<Option<Vec<u8>>> {
let row = sqlx::query("SELECT content_data FROM content_objects WHERE content_address = ?")
.bind(content_address)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|row| row.get::<Vec<u8>, _>("content_data")))
}
pub async fn get_content_info(&self, content_address: &str) -> Result<Option<ContentInfo>> {
let row = sqlx::query(
"SELECT content_type, created_at FROM content_objects WHERE content_address = ?",
)
.bind(content_address)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|row| ContentInfo {
content_address: content_address.to_string(),
content_type: row.get("content_type"),
size: 0, // Size not stored in database
created_at: row.get("created_at"),
}))
}
pub async fn content_exists(&self, content_address: &str) -> Result<bool> {
let exists = sqlx::query("SELECT 1 FROM content_objects WHERE content_address = ?")
.bind(content_address)
.fetch_optional(&self.pool)
.await?
.is_some();
Ok(exists)
}
pub async fn store_reference_with_content(
&self,
name: String,
content: &[u8],
content_type: Option<String>,
) -> Result<Reference> {
// Store the content and get its address
let content_address = self.store_content(content, content_type).await?;
// Create the reference
let reference = Reference::new(Some(content_address), name);
// Store the reference
self.store_reference(&reference).await?;
Ok(reference)
}
pub async fn get_reference_with_content(
&self,
id: &str,
) -> Result<Option<(Reference, Option<Vec<u8>>)>> {
let reference = self.get_reference(id).await?;
if let Some(ref reference) = reference {
if let Some(ref content_address) = reference.content_address {
let content = self.get_content(content_address).await?;
return Ok(Some((reference.clone(), content)));
}
}
Ok(reference.map(|r| (r, None)))
}
pub async fn delete_content(&self, content_address: &str) -> Result<bool> {
let result = sqlx::query("DELETE FROM content_objects WHERE content_address = ?")
.bind(content_address)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn list_unreferenced_content(&self) -> Result<Vec<String>> {
let rows = sqlx::query(
r#"
SELECT co.content_address
FROM content_objects co
LEFT JOIN refs r ON co.content_address = r.content_address
WHERE r.content_address IS NULL
"#,
)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(|row| row.get("content_address"))
.collect())
}
pub async fn cleanup_unreferenced_content(&self) -> Result<usize> {
let result = sqlx::query(
r#"
DELETE FROM content_objects
WHERE content_address IN (
SELECT co.content_address
FROM content_objects co
LEFT JOIN refs r ON co.content_address = r.content_address
WHERE r.content_address IS NULL
)
"#,
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() as usize)
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let content_count = sqlx::query("SELECT COUNT(*) as count FROM content_objects")
.fetch_one(&self.pool)
.await?;
let reference_count = sqlx::query("SELECT COUNT(*) as count FROM refs")
.fetch_one(&self.pool)
.await?;
Ok(StorageStats {
content_object_count: content_count.get::<i64, _>("count") as usize,
total_content_size: 0, // Size not tracked
reference_count: reference_count.get::<i64, _>("count") as usize,
})
}
}
#[derive(Debug, Clone)]
pub struct ContentInfo {
pub content_address: String,
pub content_type: Option<String>,
pub size: usize,
pub created_at: String,
}
#[derive(Debug, Clone)]
pub struct StorageStats {
pub content_object_count: usize,
pub total_content_size: usize,
pub reference_count: usize,
}