Compare commits

..

21 Commits

Author SHA1 Message Date
943074aede expt: an AGENTS.md for agentic coding 2025-07-10 20:30:03 -04:00
def2eec18a wip: move the sqlite implementation into its own module 2025-07-04 11:35:29 -05:00
a6e501f3e5 wip: limited schema migration capability 2025-07-04 11:05:03 -05:00
dcfa8bd313 wip: evolving the storage layer 2025-07-03 20:13:41 -05:00
3fbeba535c fix: a more efficient get_dependents query 2025-07-02 16:21:07 -05:00
5f3fa32fa8 wip: cargo fmt && cargo clippy --fix 2025-07-02 16:17:02 -05:00
2e673d5304 wip: the beginnings of a storage layer 2025-07-02 16:15:13 -05:00
78f114254c chore: cargo clippy fixes 2025-07-02 14:54:23 -05:00
a73b5ac597 wip: test for id calculation being stable 2025-07-02 13:59:35 -05:00
19fe4cc729 wip: update our graph to ensure deterministic ordering 2025-07-02 13:56:34 -05:00
8f2ca44ad7 fix: root content_addres updates properly 2025-07-01 15:43:00 -05:00
780bb4e372 wip: ensure root always get's updated 2025-06-12 21:03:04 -04:00
0c369864d0 wip: the beginnings of some unit tests 2025-05-21 19:37:47 -04:00
eec0dab6f0 wip: rewriting for clarity and accuracy 2025-05-09 10:39:58 -04:00
fe372b96fd wip: mutatable graph 2025-05-02 16:47:09 -04:00
0d3d634672 docs: Update our DESIGN.md with new inforatiom 2025-04-15 22:11:05 -04:00
47f0ef2575 fix: a few things that broke with axum versioning 2025-04-15 21:58:57 -04:00
ade221003e wip: Working experiment 2025-04-15 21:38:41 -04:00
a216275eb5 wip: the beginnings of a websocket protocol 2025-04-12 17:43:53 -04:00
350479bb18 chore: move to a workspace setup 2025-04-12 17:10:48 -04:00
8d5ea957a6 merge: bootstrap_experiment_1 -> main 2025-04-12 17:10:48 -04:00
26 changed files with 3821 additions and 306 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
result/
result
target/
*.avanterules
.claude/*
.claude

36
AGENTS.md Normal file
View File

@ -0,0 +1,36 @@
# AGENTS.md
## Build Commands
- Build all crates: `cargo build`
- Build specific crate: `cargo build -p <crate-name>` (e.g., `cargo build -p offline-web-storage`)
- Build with optimizations: `cargo build --release`
## Test Commands
- Run all tests: `cargo test`
- Run specific test: `cargo test <test_name>` (e.g., `cargo test test_dependencies_updated_when_nodes_added`)
- Run tests in specific crate: `cargo test -p <crate-name>`
- Run tests with output: `cargo test -- --nocapture`
## Code Style Guidelines
### Formatting & Imports
- Group imports by std, external crates, and internal modules
- Sort imports alphabetically within groups
- Use block imports with curly braces for multi-imports
### Types & Naming
- Use snake_case for functions, variables, and modules
- Use PascalCase for types and structs
- Prefix trait implementations with the trait name
- Use Option<T> for nullable values, not unwrap()
### Error Handling
- Use Result<T, E> with custom error types
- Implement thiserror::Error for error enums
- Propagate errors with ? operator
- Use descriptive error messages
### Documentation
- Document public APIs with /// comments
- Include examples in documentation for complex functions
- Explain parameter and return types in function docs

1925
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,18 +1,3 @@
[package]
name = "offline-web"
version = "0.1.0"
edition = "2021"
[lib]
path = "src/lib.rs"
[[bin]]
name = "example"
path = "src/main.rs"
[dependencies]
axum = { version = "0.8.3", features = ["macros"] }
blake2 = "0.10.6"
rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }
tokio = "1.44.1"
[workspace]
resolver = "2"
members = ["exp1", "exp2", "offline-web-model", "offline-web-storage"]

View File

@ -2,28 +2,26 @@
## Synchronization
We assume that our resources map easily to REST concepts.
We support several operations for Bootstrapping
- Resources map to a path.
- Groups of resources are represented in the path structure.
- `HEAD` to get the header to check whether we need to sync.
- `POST` is create
- `PUT` is mutate
- `GET` retrieves the resource
- `DELETE` removes the resources
* Fetch Bootstrap
* Fetch Reference
* Fetch Object
## Resource Query Datamodel
## Resource Datamodel
We assume all resources are content-addressable and form a merkle tree. We
maintain an index of path to content-addressable items.
Resource reference paths are rooted at `/api/v1/ref/<path>`.
Resource reference paths are rooted at a `/ref/` prefix for their path
namespace. They contain a payload with the `objectID`, `content_address` if
there is one, and a list of any dependent resources.
```json
{
"objectId": <merkle-hash>,
"content_address": <content-hash>,
"path": "/path/name0",
"name": "/path/name0",
"dependents": [
{
"path": "path/name1",
@ -39,82 +37,32 @@ Resource reference paths are rooted at `/api/v1/ref/<path>`.
}
```
Resource references contain a payload with the objectID and a list of any
dependent resources. They support the following operations
* `GET`
* `POST`
* `PUT`
* `DELETE`
* `HEAD`
`HEAD` requests return only the objectId in the payload. Primarily used to detect
if a reference needs to be updated on the client or not.
Resource Requests can be expanded to their object reference payloads at any
configurable depth with the query parameters `?depth=<number>` or to the full
tree depth with `?full=true`. This will add recursive dependent sections to the
content key.
```json
{
"objectId": <merkle-hash>,
"path": "/path/name0",
"dependents": [
{
"objectId": <merkle-hash>,
"path": "path/name1",
"content": {
"objectId": <merkle-hash>,
"dependents": [
{
"path": "path/name1",
"content_address": <content-hash>,
"objectId": <merkle-hash>, content: <payload>
},
{
"path": "path/name2",
"content_address": <content-hash>,
"objectId": <merkle-hash>, content: <payload>
}
],
}
}
]
}
```
### Reserved References
* `/api/v1/ref/all/<username>` The root of the resouce tree. List all sub
* `/ref/all/<username>` The root of the resouce tree. List all sub
resources that the user has access too.
* `/api/v1/ref/user/<username>` The user information.
* `/ref/user/<username>` The user information.
### Content-Addressable Query API
The content addressable store is considered immutable. You do not delete from
it. We may garbage collect at some point as a storage optimization.
Content addressable paths are rooted at `/api/v1/object/<content-hash>`.
Content addressable paths are rooted at `/object/<content-hash>`.
Their payloads are whatever the contents serialize to in json and support the following
operations.
* `GET`
* `POST`
Their payloads are whatever the contents serialize to in json
## Syncrhonization
### Bootstrapping
* Load `/api/v1/ref/all/<username>` and then follow the sub resources to
load the entire dataset locally making sure to keep the content-addresses
around for comparison.
* Load `/ref/all/<username>`
* Follow the sub resources recusively to load the entire dataset locally making
sure to keep the content-addresses around for comparison.
# Benchmarking
## Bootstrapping benchmark tests
* Server side loading
* Client side loading
* WebSocket?
* Rest API
* WebSockets

16
exp1/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "offline-web-http"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "exp1"
path = "src/main.rs"
[dependencies]
axum = { version = "0.8.0", features = ["macros"] }
blake2 = "0.10.6"
rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }
tokio = "1.44.1"
offline-web-model = { path = "../offline-web-model" }

3
exp1/README.md Normal file
View File

@ -0,0 +1,3 @@
# Experiment 1 is a traditional http get/post/put restful approach
We use multiple rest requests for bootstrap the application state and capture timings.

View File

@ -1,5 +1,6 @@
mod serve;
#[tokio::main(flavor = "current_thread")]
async fn main() {
offline_web::serve().await;
serve::serve().await;
}

95
exp1/src/serve.rs Normal file
View File

@ -0,0 +1,95 @@
use std::{collections::HashMap, sync::Arc};
use axum::{
extract::Path,
http,
response::{Html, IntoResponse},
routing::get,
Json, Router,
};
use offline_web_model::{Graph, Reference};
// TODO(jeremy): Allow this to autoexpand the content_addresses?
async fn all_references(root_ref: Arc<Reference>) -> Json<Arc<Reference>> {
Json(root_ref)
}
async fn ref_path(
refs: Arc<HashMap<String, Arc<Reference>>>,
Path(path): Path<String>,
) -> Json<Arc<Reference>> {
let path = format!("/item/{}", path);
match refs.get(&path) {
Some(r) => Json(r.clone()),
None => todo!("Return a 404?"),
}
}
async fn object_path(objects: Arc<HashMap<String, Vec<u8>>>, Path(addr): Path<String>) -> Vec<u8> {
dbg!(&addr);
match objects.get(&addr) {
Some(o) => o.clone(),
None => todo!("Return a 404?"),
}
}
async fn get_client_js() -> impl IntoResponse {
(
[(http::header::CONTENT_TYPE, "application/javascript")],
include_str!("../static/client.js"),
)
}
pub fn endpoints(graph: Graph) -> Router {
Router::new()
.nest(
"/api/v1",
Router::new()
.nest(
"/ref",
Router::new()
.route(
"/all/username",
get({
let state = graph.root.clone();
move || all_references(state)
}),
)
.route(
"/item/{*path}",
get({
let refs = graph.refs.clone();
move |path| ref_path(refs, path)
}),
),
)
.nest(
"/object",
Router::new().route(
"/{addr}",
get({
let objects = graph.objects.clone();
move |addr| object_path(objects, addr)
}),
),
),
)
.route("/lib/client.js", get(get_client_js))
.route(
"/ui/",
get(|| async { Html(include_str!("../static/index.html")).into_response() }),
)
}
// TODO(jwall): Javascript test script
pub async fn serve() {
// run our app with hyper, listening globally on port 3000
let graph = Graph::random_graph();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
println!("Server ui starting on http://127.0.0.1:3000/ui/");
axum::serve(listener, endpoints(graph)).await.unwrap();
}

18
exp2/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "offline-web-ws"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "exp2"
path = "src/main.rs"
[dependencies]
# NOTE(zaphar): we depend on this version of axum for axum-typed-websockets to work
axum = { version = "0.7.4", features = ["macros", "ws"] }
blake2 = "0.10.6"
rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }
tokio = "1.44.1"
offline-web-model = { path = "../offline-web-model" }
axum-typed-websockets = "0.6.0"

3
exp2/README.md Normal file
View File

@ -0,0 +1,3 @@
# Experiment 2 utilizes websockets
We use websockets to bootstrap the application state and capture timings.

6
exp2/src/main.rs Normal file
View File

@ -0,0 +1,6 @@
mod serve;
#[tokio::main(flavor = "current_thread")]
async fn main() {
serve::serve().await;
}

131
exp2/src/serve.rs Normal file
View File

@ -0,0 +1,131 @@
use std::sync::Arc;
use axum::{
http,
response::{Html, IntoResponse},
routing::get,
Router,
};
use axum_typed_websockets::{Message, WebSocket, WebSocketUpgrade};
use serde::{Deserialize, Serialize};
use offline_web_model::{Graph, Reference};
async fn get_client_js() -> impl IntoResponse {
(
[(http::header::CONTENT_TYPE, "application/javascript")],
include_str!("../static/client.js"),
)
}
#[derive(Debug, Serialize, Deserialize)]
enum ServerMsg {
Reference(Reference),
Object(Vec<u8>),
}
#[derive(Debug, Serialize, Deserialize)]
enum ClientMsg {
Bootstrap,
GetReference(String),
GetObject(String),
}
async fn handle_websocket(
ws: WebSocketUpgrade<ServerMsg, ClientMsg>,
graph: Arc<Graph>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, graph))
}
async fn handle_socket(mut socket: WebSocket<ServerMsg, ClientMsg>, graph: Arc<Graph>) {
// Send initial data to the client
// Handle incoming messages
// TODO(zaphar): This should actually be smarter in the case of None or Err.
while let Some(Ok(_msg)) = socket.recv().await {
match _msg {
Message::Item(ClientMsg::Bootstrap) => {
if let Err(e) = socket
.send(Message::Item(ServerMsg::Reference(
graph.root.as_ref().clone(),
)))
.await
{
println!("Error sending bootstrap reference: {:?}", e);
continue;
}
}
Message::Item(ClientMsg::GetReference(path)) => {
if let Some(reference) = graph.refs.get(&path) {
if let Err(e) = socket
.send(Message::Item(ServerMsg::Reference((**reference).clone())))
.await
{
println!("Error sending reference: {:?}", e);
continue;
}
} else {
println!("Reference not found: {}", path);
}
}
Message::Item(ClientMsg::GetObject(address)) => {
if let Some(object) = graph.get_object(&address) {
if let Err(e) = socket
.send(Message::Item(ServerMsg::Object(object.clone())))
.await
{
println!("Error sending object: {:?}", e);
continue;
}
} else {
println!("Object not found: {}", address);
}
}
Message::Ping(items) => {
println!("unhandled ping msg: {:?}", items);
}
Message::Pong(items) => {
println!("unhandled pong msg: {:?}", items);
}
Message::Close(_close_frame) => {
println!("closing websocket connection at client request");
break;
}
}
}
println!("WebSocket connection closed");
}
pub fn endpoints(graph: Graph) -> Router {
// WebSocket route now implemented
let graph = Arc::new(graph);
Router::new()
.nest(
"/api/v1",
Router::new().route(
"/ws",
get(|ws: WebSocketUpgrade<ServerMsg, ClientMsg>| async move {
handle_websocket(ws, graph.clone()).await
}),
),
)
.route("/lib/client.js", get(get_client_js))
.route(
"/ui/",
get(|| async { Html(include_str!("../static/index.html")).into_response() }),
)
}
// TODO(jwall): Javascript test script
pub async fn serve() {
// run our app with hyper, listening globally on port 3000
let graph = Graph::random_graph();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
println!("Server ui starting on http://127.0.0.1:3000/ui/");
println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws");
axum::serve(listener, endpoints(graph)).await.unwrap();
}

197
exp2/static/client.js Normal file
View File

@ -0,0 +1,197 @@
export { bootstrap };
/**
* @typedef {Object} Reference
* @property {Array<Reference>} dependents
* @property {string} path
* @property {string} object_id
* @property {string} content_address
*/
/**
* @typedef {Object} ServerMsg
* @property {Reference?} Reference
* @property {string?} Object
*/
/**
* @param {WebSocket} socket
* @returns {Promise<ServerMsg>}
*/
async function load_bootstrap(socket) {
// Wait for the connection to be established
const data = await send_socket_msg(socket,
JSON.stringify("Bootstrap"));
return data;
}
/**
* @param {WebSocket} socket
* @param {string} msg
* @returns {Promise<ServerMsg>}
*/
async function send_socket_msg(socket, msg) {
// Send a request for all references
socket.send(msg);
// Wait for the response
/** @type {Promise<String>} */
const stream = await new Promise((resolve, reject) => {
socket.onmessage = (event) => {
resolve(event.data.text());
};
socket.onerror = (_error) => reject(new Error("WebSocket error occurred"));
});
let data = await stream;
return JSON.parse(data);
}
/**
* @param {String} dbName
* @param {Array<String>} storeNames
* @returns {Promise<IDBDatabase>}
*/
async function openDatabase(dbName, storeNames) {
return await new Promise((resolve, reject) => {
const request = indexedDB.open(dbName, 1);
request.onupgradeneeded = (event) => {
const db = event.target.result;
for (var storeName of storeNames) {
// Create the object store if it doesn't exist
if (!db.objectStoreNames.contains(storeName)) {
db.createObjectStore(storeName);
}
}
};
request.onsuccess = (event) => {
const db = event.target.result;
resolve(db);
};
request.onerror = (event) => {
reject(event.target.error);
};
});
}
/**
* Stores a reference object in the IndexedDB.
* @param {IDBObjectStore} store
* @param {Object} reference
* @param {string} root_path
* @returns {Promise<any>}
*/
function storeObject(store, reference, root_path) {
return new Promise((resolve, reject) => {
const request = store.put(JSON.stringify(reference), root_path);
request.onerror = (evt) => {
reject(evt.target.error);
console.log("Failed to store object", evt);
};
request.onsuccess = (evt) => {
resolve(evt.target.result);
};
});
}
/**
* @param {IDBObjectStore} refStore
* @param {Object} reference
* @returns {Promise<Array<Reference>>} An array of references
*/
function load_reference_paths(refStore, reference) {
return new Promise(async (resolve, _reject) => {
let references = [];
references.push(reference);
if (reference.dependents) {
for (var dep of reference.dependents) {
references = references.concat(await load_reference_paths(refStore, dep));
}
}
await storeObject(refStore, reference, reference.path);
resolve(references);
});
}
/**
* @param {WebSocket} socket
* @param {IDBDatabase} db
* @param {string} storeName
* @param {Array<Reference>} references
*/
async function load_objects_and_store(socket, db, references, storeName) {
let objects = []
for (var ref of references) {
/** @type {Response} */
if (ref.dependents && ref.dependents.length != 0) {
continue; // not a leaf object
}
let data = await send_socket_msg(socket, JSON.stringify({ "GetObject": ref.content_address }));
if (!data.Object) {
throw { error: "Not an object" };
}
objects.push({ id: ref.content_address, content: data.Object });
}
const objectTrxAndStore = await getStoreAndTransaction(db, storeName);
for (var obj of objects) {
await storeObject(objectTrxAndStore.store, obj.content, obj.id);
}
await new Promise((resolve, reject) => {
objectTrxAndStore.trx.oncomplete = () => resolve();
objectTrxAndStore.trx.onerror = (event) => reject(event.target.error);
});
}
/**
* @param {string} storeName
* @param {IDBDatabase} db
* @returns {Promise<{trx: IDBTransaction, store: IDBObjectStore}>} The transaction and object store.
*/
async function getStoreAndTransaction(db, storeName) {
const transaction = db.transaction([storeName], "readwrite");
return { trx: transaction, store: transaction.objectStore(storeName) };
}
/**
* @returns {Number} The number of milliseconds it took to bootstrap.
*/
async function bootstrap() {
const refStoreName = "references";
const objectStoreName = "objects";
const databaseName = "MerkleStore";
const start = new Date().getTime();
const socket = new WebSocket(`ws://${window.location.host}/api/v1/ws`);
await new Promise((resolve, reject) => {
socket.onopen = () => resolve();
socket.onerror = (error) => reject(new Error("WebSocket connection failed" + error));
});
const data = await load_bootstrap(socket);
if (!data.Reference) {
throw { error: "Not a Reference" };
}
const db = await openDatabase(databaseName, [refStoreName, objectStoreName]);
const refTrxAndStore = await getStoreAndTransaction(db, refStoreName);
// Use a promise to wait for the transaction to complete
const transactionComplete = new Promise((resolve, reject) => {
refTrxAndStore.trx.oncomplete = () => resolve();
refTrxAndStore.trx.onerror = (event) => reject(event.target.error);
});
const refs = await load_reference_paths(refTrxAndStore.store, data.Reference);
// Wait for the transaction to complete
await transactionComplete;
await load_objects_and_store(socket, db, refs, objectStoreName);
const end = new Date().getTime();
return end - start;
}

26
exp2/static/index.html Normal file
View File

@ -0,0 +1,26 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Bootstrap Time Display</title>
</head>
<body>
<h1>Bootstrap Time</h1>
<div id="bootstrap-time">Loading...</div>
<script src="/lib/client.js" type="module"></script>
<script type="module">
import { bootstrap } from '/lib/client.js';
document.addEventListener('DOMContentLoaded', async () => {
try {
const bootstrapTime = await bootstrap(); // This function should be defined in your JS file
document.getElementById('bootstrap-time').textContent = `Bootstrap Time: ${bootstrapTime} ms`;
} catch (error) {
console.error('Error loading bootstrap time:', error);
document.getElementById('bootstrap-time').textContent = 'Error loading bootstrap time';
}
});
</script>
</body>
</html>

View File

@ -0,0 +1,9 @@
[package]
name = "offline-web-model"
version = "0.1.0"
edition = "2024"
[dependencies]
blake2 = "0.10.6"
rand = "0.9.1"
serde = { version = "1.0.219", features = ["derive", "rc"] }

View File

@ -0,0 +1,310 @@
use std::{collections::HashMap, sync::Arc};
use blake2::{Blake2b512, Digest};
use rand::Rng;
use serde::{Deserialize, Serialize};
/// A `Reference` represents a node in a content-addressable graph structure.
///
/// Each reference contains:
/// - An id that uniquely identifies the referenced object
/// - A content address hash that represents the content
/// - A name that provides a human-readable name for the reference
/// - A list of dependent references that this reference depends on
///
/// References form a directed acyclic graph where each node can have multiple dependents.
/// When the content of a reference changes, the object id of all parent references
/// are automatically recalculated to maintain content addressability throughout the graph.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Reference {
pub id: String,
pub content_address: Option<String>,
pub name: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub dependents: Vec<Arc<Reference>>,
}
impl Reference {
/// Creates a new Reference with the specified object ID, content address, and path.
///
/// # Parameters
/// * `content_address` - The content-based hash address of the referenced object
/// * `name` - The name of the reference
///
/// # Returns
/// A new Reference instance with no dependents
pub fn new(content_address: Option<String>, name: String) -> Self {
// Calculate the reference_id from the content_address and path
let hasher = Self::initial_hash(&content_address, &name);
let calculated_id = format!("{:x}", hasher.finalize());
Self {
id: calculated_id,
content_address,
name,
dependents: Vec::new(),
}
}
/// Adds a dependent reference to this reference.
///
/// This establishes a parent-child relationship where this reference depends on
/// the provided reference. When a dependent reference changes, the parent's
/// id will be recalculated.
///
/// # Parameters
/// * `dep` - An Arc-wrapped Reference to add as a dependent
///
/// # Returns
/// The modified Reference with the new dependent added
pub fn add_dep(&self, dep: Arc<Reference>) -> Self {
let mut cloned = self.clone();
cloned.dependents.push(dep);
// We ensure that our dependents are always sorted lexicographically by name.
cloned
.dependents
.sort_by(|left, right| left.name.cmp(&right.name));
// Recalculate the ID based on dependents, content_address, and path
let mut hasher = Self::initial_hash(&self.content_address, &self.name);
for dependent in &cloned.dependents {
hasher.update(&dependent.id);
}
cloned.id = format!("{:x}", hasher.finalize());
cloned
}
/// Converts this Reference into an Arc-wrapped Reference for shared ownership.
///
/// This is useful when a Reference needs to be shared across multiple owners
/// in the graph structure.
///
/// # Returns
/// An Arc-wrapped version of this Reference
pub fn to_arc(self) -> Arc<Self> {
Arc::new(self)
}
/// Determines if this Reference is a leaf node (has no dependents).
///
/// Leaf nodes directly represent content, while non-leaf nodes derive their
/// content address from their dependents.
///
/// # Returns
/// `true` if this Reference has no dependents, `false` otherwise
pub fn is_leaf(&self) -> bool {
self.dependents.is_empty()
}
fn initial_hash(content_address: &Option<String>, path: &String) -> Blake2b512 {
let mut hasher = Blake2b512::new();
if let Some(content_address) = content_address {
hasher.update(content_address);
}
hasher.update(path);
hasher
}
}
pub struct Graph {
pub root: Arc<Reference>,
pub refs: Arc<HashMap<String, Arc<Reference>>>,
pub objects: Arc<HashMap<String, Vec<u8>>>,
}
pub fn random_object() -> (String, Vec<u8>) {
let mut rng = rand::rng();
let random_size = rng.random_range(50..=4096);
let random_bytes: Vec<u8> = (0..random_size).map(|_| rng.random::<u8>()).collect();
let mut hasher = Blake2b512::new();
hasher.update(&random_bytes);
let hash = format!("{:x}", hasher.finalize());
(hash, random_bytes)
}
impl Graph {
pub fn new(root: Arc<Reference>) -> Self {
let mut refs = HashMap::new();
refs.insert(root.name.clone(), root.clone());
let refs = Arc::new(refs);
let objects = Arc::new(HashMap::new());
Self {
root,
refs,
objects,
}
}
/// Gets a reference by its path
pub fn get_reference(&self, path: &str) -> Option<Arc<Reference>> {
self.refs.get(path).cloned()
}
/// Gets an object by its content address
pub fn get_object(&self, content_address: &str) -> Option<&Vec<u8>> {
self.objects.get(content_address)
}
/// Updates a reference to point to a new object, recalculating content addresses and IDs
/// for all affected references in the graph.
///
/// The reference ID is calculated from the content address, name, and any dependents,
/// ensuring that it's truly content-addressable.
pub fn update_object_reference(
&mut self,
name: &String,
new_content: Vec<u8>,
) -> Result<(), String> {
// Create a mutable copy of our maps
let mut refs = HashMap::new();
for (k, v) in self.refs.as_ref() {
refs.insert(k.clone(), v.clone());
}
let mut objects = HashMap::new();
for (k, v) in self.objects.as_ref() {
objects.insert(k.clone(), v.clone());
}
// Find the reference to update
let ref_to_update = refs
.get(name)
.ok_or_else(|| format!("Reference with name {} not found", name))?;
// Calculate hash for the new content
let mut hasher = Blake2b512::new();
hasher.update(&new_content);
let new_address = format!("{:x}", hasher.finalize());
// Create a new reference with the updated content address
// The ID will be calculated based on the content address, name, and dependents
let mut updated_ref = Reference::new(Some(new_address.clone()), name.to_string());
// Add all dependents to the updated reference
for dep in &ref_to_update.dependents {
updated_ref = updated_ref.add_dep(dep.clone());
}
let updated_ref = updated_ref.to_arc();
// Update objects map with new content
objects.insert(new_address.clone(), new_content);
// Update references map with new reference
refs.insert(name.to_string(), updated_ref.clone());
// Find and update all parent references that contain this reference
self.update_parent_references(&mut refs, name);
// Update the Arc maps
self.refs = Arc::new(refs);
self.objects = Arc::new(objects);
Ok(())
}
/// TODO(jwall): Add new reference
/// Recursively updates parent references when a child reference changes
fn update_parent_references(
&mut self,
refs: &mut HashMap<String, Arc<Reference>>,
updated_name: &str,
) {
// Find all references that have the updated reference as a dependent
let parent_names: Vec<String> = refs
.iter()
.filter(|(_, r)| r.dependents.iter().any(|dep| dep.name == updated_name))
.map(|(name, _)| name.clone())
.collect();
for parent_name in parent_names {
if let Some(parent_ref) = refs.get(&parent_name) {
// Create a new reference with the same content address and name
let mut updated_parent =
Reference::new(parent_ref.content_address.clone(), parent_ref.name.clone());
// Add all dependents, replacing the updated one
for dep in &parent_ref.dependents {
// Update the root reference if needed
if dep.name == updated_name {
// Use the updated reference
updated_parent =
updated_parent.add_dep(refs.get(updated_name).unwrap().clone());
} else {
// Keep the existing dependent
updated_parent = updated_parent.add_dep(dep.clone());
}
}
// The ID is automatically calculated in the add_dep method
let updated_parent = Arc::new(updated_parent);
if updated_parent.name == self.root.name {
self.root = updated_parent.clone();
}
// Update the references map
refs.insert(parent_name.clone(), updated_parent);
// Recursively update parents of this parent
self.update_parent_references(refs, &parent_name);
}
}
}
pub fn random_graph() -> Graph {
let root_name = String::from("ref/0");
let mut objects = HashMap::new();
let mut refs = HashMap::new();
// Create the root reference
let mut root_ref = Reference::new(Some(String::from("root_content")), root_name.clone());
// Create 10 item references
for i in 1..=10 {
let item_name = format!("/item/{}", i);
let mut item_ref =
Reference::new(Some(format!("item_content_{}", i)), item_name.clone());
// Create 10 subitems for each item
for j in 1..=10 {
let (address, content) = random_object();
let subitem_name = format!("/item/{}/subitem/{}", i, j);
// Create a leaf reference
let leaf_ref = Reference::new(Some(address.clone()), subitem_name).to_arc();
// Add the leaf reference as a dependent to the item reference
item_ref = item_ref.add_dep(leaf_ref.clone());
// Store the content in the objects map
objects.insert(address.clone(), content);
// Store the leaf reference in the refs map
refs.insert(leaf_ref.name.clone(), leaf_ref);
}
// Convert the item reference to Arc and add it to the root reference
let arc_item_ref = item_ref.to_arc();
root_ref = root_ref.add_dep(arc_item_ref.clone());
// Store the item reference in the refs map
refs.insert(arc_item_ref.name.clone(), arc_item_ref);
}
// Convert the root reference to Arc
let arc_root_ref = root_ref.to_arc();
// Store the root reference in the refs map
refs.insert(arc_root_ref.name.clone(), arc_root_ref.clone());
Graph {
root: arc_root_ref,
refs: Arc::new(refs),
objects: Arc::new(objects),
}
}
}
#[cfg(test)]
mod test;

View File

@ -0,0 +1,376 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use crate::{Graph, Reference, random_object};
fn get_deterministic_candidate(graph: &Graph) -> Arc<Reference> {
// Pick a deterministic leaf node to update (first lexicographically)
let mut refs: Vec<Arc<Reference>> = graph
.refs
.values()
.filter(|r| r.name != graph.root.name && r.is_leaf())
.cloned()
.collect();
// Sort by name to ensure deterministic ordering
refs.sort_by(|a, b| a.name.cmp(&b.name));
refs[0].clone()
}
/// Tests that all dependencies are kept updated when new nodes are added
#[test]
fn test_dependencies_updated_when_nodes_added() {
// Create a simple graph
let mut graph = create_test_graph();
// Get the initial content address of the root
let initial_root_id = graph.root.id.clone();
let candidate = get_deterministic_candidate(&graph);
// Update the leaf node with deterministic content
let new_content = b"deterministic_test_content".to_vec();
graph
.update_object_reference(&candidate.name, new_content)
.unwrap();
// Verify that the leaf node's ID has changed
let updated_leaf = graph.get_reference(&candidate.name).unwrap();
assert_ne!(
updated_leaf.id, candidate.id,
"Leaf node ID should change when content is updated"
);
// Verify that the root's ID has changed
assert_ne!(
graph.root.id, initial_root_id,
"Root ID should change when a dependent node is updated"
);
}
/// Tests that the root of the graph is not itself a dependency of any other node
#[test]
fn test_root_not_a_dependency() {
let graph = create_test_graph();
let root_name = graph.root.name.clone();
// Check all references to ensure none have the root as a dependent
for reference in graph.refs.as_ref().values() {
for dep in &reference.dependents {
assert_ne!(
dep.name, root_name,
"Root should not be a dependency of any other node"
);
}
}
}
/// Tests that all nodes are dependents or transitive dependents of the root
#[test]
fn test_all_nodes_connected_to_root() {
let graph = create_test_graph();
// Collect all nodes reachable from the root
let mut reachable = HashSet::new();
fn collect_reachable(node: &Arc<Reference>, reachable: &mut HashSet<String>) {
reachable.insert(node.name.clone());
for dep in &node.dependents {
if !reachable.contains(&dep.name) {
collect_reachable(dep, reachable);
}
}
}
collect_reachable(&graph.root, &mut reachable);
// Check that all nodes in the graph are reachable from the root
for name in graph.refs.as_ref().keys() {
assert!(
reachable.contains(name),
"All nodes should be reachable from the root: {}",
name
);
}
}
/// Helper function to create a test graph with a known structure
fn create_test_graph() -> Graph {
let root_name = String::from("/root");
let mut objects = HashMap::new();
let mut refs = HashMap::new();
// Create the root reference
let mut root_ref = Reference::new(Some(String::from("root_content")), root_name.clone());
// Create 3 item references
for i in 1..=3 {
let item_name = format!("/item/{}", i);
let mut item_ref = Reference::new(Some(format!("item_content_{}", i)), item_name.clone());
// Create 3 subitems for each item
for j in 1..=3 {
let (address, content) = random_object();
let subitem_name = format!("/item/{}/subitem/{}", i, j);
// Create a leaf reference
let leaf_ref = Reference::new(Some(address.clone()), subitem_name).to_arc();
// Add the leaf reference as a dependent to the item reference
item_ref = item_ref.add_dep(leaf_ref.clone());
// Store the content in the objects map
objects.insert(address.clone(), content);
// Store the leaf reference in the refs map
refs.insert(leaf_ref.name.clone(), leaf_ref);
}
// Convert the item reference to Arc and add it to the root reference
let arc_item_ref = item_ref.to_arc();
root_ref = root_ref.add_dep(arc_item_ref.clone());
// Store the item reference in the refs map
refs.insert(arc_item_ref.name.clone(), arc_item_ref);
}
// Convert the root reference to Arc
let arc_root_ref = root_ref.to_arc();
// Store the root reference in the refs map
refs.insert(arc_root_ref.name.clone(), arc_root_ref.clone());
Graph {
root: arc_root_ref,
refs: Arc::new(refs),
objects: Arc::new(objects),
}
}
/// Tests that the graph correctly handles content-addressable properties
#[test]
fn test_content_addressable_properties() {
let mut graph = create_test_graph();
// Update a leaf node with the same content
let leaf_path = "/item/1/subitem/1".to_string();
let initial_leaf = graph.get_reference(&leaf_path).unwrap();
if let Some(content_address) = initial_leaf.content_address.clone() {
// Get the content for this address
let content = graph.get_object(&content_address).unwrap().clone();
// Update with the same content
graph.update_object_reference(&leaf_path, content).unwrap();
}
// Verify that nothing changed since the content is the same
let updated_leaf = graph.get_reference(&leaf_path).unwrap();
assert_eq!(
updated_leaf.content_address, initial_leaf.content_address,
"Content address should not change when content remains the same"
);
}
/// Tests that the graph correctly handles ID calculation
#[test]
fn test_id_calculation() {
let mut graph = create_test_graph();
// Update a leaf node
let leaf_path = "/item/1/subitem/1".to_string();
let initial_leaf = graph.get_reference(&leaf_path).unwrap();
graph
.update_object_reference(&leaf_path, "new content".as_bytes().to_vec())
.unwrap();
// Verify that the ID changed
let updated_leaf = graph.get_reference(&leaf_path).unwrap();
assert_ne!(
updated_leaf.id, initial_leaf.id,
"Reference ID should change when content changes"
);
// Verify that parent ID changed
let parent_path = "/item/1".to_string();
let parent = graph.get_reference(&parent_path).unwrap();
// Create a reference with the same properties to calculate expected ID
let mut test_ref = Reference::new(parent.content_address.clone(), parent.name.clone());
// Add the same dependents
for dep in &parent.dependents {
test_ref = test_ref.add_dep(dep.clone());
}
// Verify the ID calculation is consistent
assert_eq!(
parent.id, test_ref.id,
"ID calculation should be consistent for the same reference properties"
);
}
/// Tests that the root ID always changes when any reference is updated
#[test]
fn test_root_id_changes_for_any_reference_update() {
let mut graph = create_test_graph();
// Get all non-root references sorted by name for deterministic iteration
let mut all_refs: Vec<(String, String)> = graph
.refs
.as_ref()
.iter()
.filter(|(name, _)| **name != graph.root.name)
.map(|(name, ref_arc)| (name.clone(), ref_arc.id.clone()))
.collect();
all_refs.sort_by(|a, b| a.0.cmp(&b.0));
// Test each reference update
for (ref_name, original_ref_id) in all_refs {
// Record the current root ID
let initial_root_id = graph.root.id.clone();
// Update the reference with new content
let new_content = format!("updated_content_for_{}", ref_name).into_bytes();
graph
.update_object_reference(&ref_name, new_content)
.unwrap();
// Verify the reference itself changed
let updated_ref = graph.get_reference(&ref_name).unwrap();
assert_ne!(
updated_ref.id, original_ref_id,
"Reference {} should have changed ID after update",
ref_name
);
// Verify the root ID changed
assert_ne!(
graph.root.id, initial_root_id,
"Root ID should change when reference {} is updated",
ref_name
);
}
}
/// Tests that Reference IDs are stable regardless of dependency add order
#[test]
fn test_reference_ids_stable_regardless_of_dependency_order() {
// Create test dependencies
let dep_a = Reference::new(Some(String::from("content_a")), String::from("/dep_a")).to_arc();
let dep_b = Reference::new(Some(String::from("content_b")), String::from("/dep_b")).to_arc();
let dep_c = Reference::new(Some(String::from("content_c")), String::from("/dep_c")).to_arc();
let dep_d = Reference::new(Some(String::from("content_d")), String::from("/dep_d")).to_arc();
// Create base reference
let base_ref = Reference::new(Some(String::from("base_content")), String::from("/base"));
// Test multiple different orders of adding the same dependencies
let orders = [
vec![dep_a.clone(), dep_b.clone(), dep_c.clone(), dep_d.clone()], // alphabetical
vec![dep_d.clone(), dep_c.clone(), dep_b.clone(), dep_a.clone()], // reverse alphabetical
vec![dep_b.clone(), dep_d.clone(), dep_a.clone(), dep_c.clone()], // random order 1
vec![dep_c.clone(), dep_a.clone(), dep_d.clone(), dep_b.clone()], // random order 2
vec![dep_d.clone(), dep_a.clone(), dep_b.clone(), dep_c.clone()],
];
let mut all_ids = Vec::new();
for (i, order) in orders.iter().enumerate() {
let mut test_ref = base_ref.clone();
// Add dependencies in this specific order
for dep in order {
test_ref = test_ref.add_dep(dep.clone());
}
all_ids.push(test_ref.id.clone());
// Verify that dependencies are always sorted lexicographically regardless of add order
for j in 0..test_ref.dependents.len() - 1 {
let current = &test_ref.dependents[j];
let next = &test_ref.dependents[j + 1];
assert!(
current.name < next.name,
"Dependencies should be lexicographically ordered. Order {}: Found '{}' before '{}'",
i,
current.name,
next.name
);
}
}
// Verify all IDs are identical
let first_id = &all_ids[0];
for (i, id) in all_ids.iter().enumerate() {
assert_eq!(
id, first_id,
"Reference ID should be stable regardless of dependency add order. Order {} produced different ID",
i
);
}
}
/// Tests that dependencies of a Reference are always lexicographically ordered by name
#[test]
fn test_dependencies_lexicographically_ordered() {
let graph = create_test_graph();
// Check all references to ensure their dependents are lexicographically ordered
for reference in graph.refs.as_ref().values() {
if reference.dependents.len() > 1 {
// Check that dependents are ordered by name
for i in 0..reference.dependents.len() - 1 {
let current = &reference.dependents[i];
let next = &reference.dependents[i + 1];
assert!(
current.name < next.name,
"Dependencies should be lexicographically ordered by name. Found '{}' before '{}'",
current.name,
next.name
);
}
}
}
// Also verify that when we add dependencies, they maintain the correct order
let mut test_ref = Reference::new(
Some(String::from("test_content")),
String::from("/test_ordering"),
);
// Add dependencies in non-lexicographical order
let dep_c = Reference::new(Some(String::from("c_content")), String::from("/c")).to_arc();
let dep_a = Reference::new(Some(String::from("a_content")), String::from("/a")).to_arc();
let dep_b = Reference::new(Some(String::from("b_content")), String::from("/b")).to_arc();
// Add in non-lexicographical order
test_ref = test_ref.add_dep(dep_c.clone());
test_ref = test_ref.add_dep(dep_a.clone());
test_ref = test_ref.add_dep(dep_b.clone());
// Verify they are stored in lexicographical order
assert_eq!(
test_ref.dependents[0].name, "/a",
"First dependent should be '/a'"
);
assert_eq!(
test_ref.dependents[1].name, "/b",
"Second dependent should be '/b'"
);
assert_eq!(
test_ref.dependents[2].name, "/c",
"Third dependent should be '/c'"
);
}

View File

@ -0,0 +1,18 @@
[package]
name = "offline-web-storage"
version = "0.1.0"
edition = "2021"
[dependencies]
offline-web-model = { path = "../offline-web-model" }
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite", "uuid", "chrono"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
blake2 = "0.10"
thiserror = "2.0.12"
[dev-dependencies]

View File

@ -0,0 +1,193 @@
use std::sync::Arc;
use offline_web_model::Reference;
use super::{ReferenceStore, SqliteReferenceStore, StoreError};
async fn create_test_store() -> SqliteReferenceStore {
SqliteReferenceStore::new("sqlite::memory:").await.unwrap()
}
#[tokio::test]
async fn test_store_and_retrieve_reference() {
let store = create_test_store().await;
// Create a test reference
let reference = Reference::new(
Some("test_content_address".to_string()),
"test_reference".to_string(),
);
// Store the reference
store.store_reference(&reference).await.unwrap();
// Retrieve the reference by ID
let retrieved = store.get_reference(&reference.id).await.unwrap();
// Verify the retrieved reference matches the original
assert_eq!(retrieved.id, reference.id);
assert_eq!(retrieved.content_address, reference.content_address);
assert_eq!(retrieved.name, reference.name);
assert_eq!(retrieved.dependents.len(), reference.dependents.len());
}
#[tokio::test]
async fn test_store_and_retrieve_content() {
let store = create_test_store().await;
let content = b"Hello, World!";
let content_address = "test_content_address";
// Store content
store.store_content(content_address, content).await.unwrap();
// Create a reference pointing to this content
let reference = Reference::new(
Some(content_address.to_string()),
"test_reference".to_string(),
);
// Retrieve content using the reference
let retrieved_content = store.get_content_for_reference(reference).await.unwrap();
// Verify the content matches
assert_eq!(retrieved_content, String::from_utf8(content.to_vec()).unwrap());
}
#[tokio::test]
async fn test_reference_with_dependents() {
let store = create_test_store().await;
// Create a leaf reference (no dependents)
let leaf_ref = Reference::new(
Some("leaf_content_address".to_string()),
"leaf_reference".to_string(),
);
// Store the leaf reference
store.store_reference(&leaf_ref).await.unwrap();
// Create a parent reference that depends on the leaf
let parent_ref = Reference::new(
Some("parent_content_address".to_string()),
"parent_reference".to_string(),
).add_dep(Arc::new(leaf_ref.clone()));
// Store the parent reference
store.store_reference(&parent_ref).await.unwrap();
// Retrieve the parent reference
let retrieved_parent = store.get_reference(&parent_ref.id).await.unwrap();
// Verify the parent has the correct dependent
assert_eq!(retrieved_parent.dependents.len(), 1);
assert_eq!(retrieved_parent.dependents[0].name, leaf_ref.name);
}
#[tokio::test]
async fn test_get_graph() {
let store = create_test_store().await;
// Create a hierarchy of references
let leaf1 = Reference::new(
Some("leaf1_content".to_string()),
"leaf1".to_string(),
);
let leaf2 = Reference::new(
Some("leaf2_content".to_string()),
"leaf2".to_string(),
);
let parent = Reference::new(
Some("parent_content".to_string()),
"parent".to_string(),
)
.add_dep(Arc::new(leaf1.clone()))
.add_dep(Arc::new(leaf2.clone()));
let root = Reference::new(
Some("root_content".to_string()),
"root".to_string(),
).add_dep(Arc::new(parent.clone()));
// Store all references
store.store_reference(&leaf1).await.unwrap();
store.store_reference(&leaf2).await.unwrap();
store.store_reference(&parent).await.unwrap();
store.store_reference(&root).await.unwrap();
// Get the graph starting from root
let graph = store.get_graph("root").await.unwrap();
// Verify we got all references in the graph
assert_eq!(graph.len(), 4);
// Verify we have all the expected references
let names: Vec<_> = graph.iter().map(|r| &r.name).collect();
assert!(names.contains(&&"root".to_string()));
assert!(names.contains(&&"parent".to_string()));
assert!(names.contains(&&"leaf1".to_string()));
assert!(names.contains(&&"leaf2".to_string()));
}
#[tokio::test]
async fn test_nonexistent_reference() {
let store = create_test_store().await;
// Try to retrieve a reference that doesn't exist
let result = store.get_reference("nonexistent_id").await;
// Should return NoSuchReference error
assert!(matches!(result, Err(StoreError::NoSuchReference)));
}
#[tokio::test]
async fn test_nonexistent_content() {
let store = create_test_store().await;
// Create a reference with a content address that doesn't exist
let reference = Reference::new(
Some("nonexistent_content_address".to_string()),
"test_reference".to_string(),
);
// Try to retrieve content
let result = store.get_content_for_reference(reference).await;
// Should return NoSuchContentAddress error
assert!(matches!(result, Err(StoreError::NoSuchContentAddress)));
}
#[tokio::test]
async fn test_reference_without_content_address() {
let store = create_test_store().await;
// Create a reference without a content address
let reference = Reference::new(None, "test_reference".to_string());
// Try to retrieve content
let result = store.get_content_for_reference(reference).await;
// Should return NoSuchContentAddress error
assert!(matches!(result, Err(StoreError::NoSuchContentAddress)));
}
#[tokio::test]
async fn test_schema_version_management() {
let store = create_test_store().await;
// Verify the schema version is correctly set
let version = store.get_current_schema_version().await.unwrap();
assert_eq!(version, 1, "Schema version should be 1");
// Verify we can still perform basic operations
let reference = Reference::new(
Some("test_content".to_string()),
"test_schema_version".to_string(),
);
store.store_reference(&reference).await.unwrap();
let retrieved = store.get_reference(&reference.id).await.unwrap();
assert_eq!(retrieved.name, reference.name);
}

View File

@ -0,0 +1,28 @@
use thiserror::Error;
use offline_web_model::Reference;
#[derive(Error, Debug)]
pub enum StoreError {
#[error("No such reference")]
NoSuchReference,
#[error("No such content address")]
NoSuchContentAddress,
#[error("Unknown Storage Error: {0:?}")]
StorageError(Box<dyn std::error::Error>),
}
#[allow(async_fn_in_trait)]
pub trait ReferenceStore {
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError>;
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError>;
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError>;
}
mod sqlite;
pub use sqlite::SqliteReferenceStore;
#[cfg(test)]
mod integration_tests;

View File

@ -0,0 +1,447 @@
use std::sync::Arc;
use std::collections::{BTreeMap, HashMap};
use sqlx::{Pool, Row, Sqlite, SqlitePool};
use offline_web_model::Reference;
use crate::StoreError;
use crate::ReferenceStore;
// Schema version constants
const CURRENT_SCHEMA_VERSION: i32 = 1;
const INITIAL_SCHEMA_VERSION: i32 = 0;
pub struct SqliteReferenceStore {
pool: Pool<Sqlite>,
}
impl SqliteReferenceStore {
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
let pool = SqlitePool::connect(database_url)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
let store = Self { pool };
// Check current schema version and migrate if necessary
let current_version = store.get_current_schema_version().await?;
if current_version != CURRENT_SCHEMA_VERSION {
store.migrate_schema(current_version, CURRENT_SCHEMA_VERSION).await?;
}
Ok(store)
}
pub async fn get_current_schema_version(&self) -> Result<i32, StoreError> {
// First, ensure the schema_version table exists
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP,
description TEXT
)
"#,
)
.execute(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Get the current version
let row = sqlx::query(
r#"
SELECT version FROM schema_version ORDER BY version DESC LIMIT 1
"#,
)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
match row {
Some(row) => {
let version: i32 = row.get("version");
Ok(version)
}
None => {
// No version found, this is a fresh database
Ok(INITIAL_SCHEMA_VERSION)
}
}
}
async fn migrate_schema(&self, from_version: i32, to_version: i32) -> Result<(), StoreError> {
if from_version == to_version {
return Ok(());
}
if from_version > to_version {
return Err(StoreError::StorageError(
"Downward migrations not currently supported".into()
));
}
// Use a transaction for the entire migration process
let mut tx = self.pool.begin().await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Apply migrations step by step
let mut current_version = from_version;
while current_version < to_version {
match current_version {
0 => {
// Migration from version 0 to 1: Initial schema setup
self.migrate_to_v1(&mut tx).await?;
current_version = 1;
}
_ => {
return Err(StoreError::StorageError(
format!("Unknown migration path from version {}", current_version).into()
));
}
}
}
// Commit all migrations
tx.commit().await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
Ok(())
}
async fn migrate_to_v1(&self, tx: &mut sqlx::Transaction<'_, Sqlite>) -> Result<(), StoreError> {
// Create the main application tables
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS ref_entries (
id TEXT PRIMARY KEY,
content_address TEXT,
name TEXT NOT NULL UNIQUE
)
"#,
)
.execute(&mut **tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS ref_dependencies (
parent_id TEXT NOT NULL,
dependent_id TEXT NOT NULL,
PRIMARY KEY (parent_id, dependent_id),
FOREIGN KEY (parent_id) REFERENCES ref_entries(id),
FOREIGN KEY (dependent_id) REFERENCES ref_entries(id)
)
"#,
)
.execute(&mut **tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS content_store (
content_address TEXT PRIMARY KEY,
content BLOB NOT NULL
)
"#,
)
.execute(&mut **tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Record the schema version
sqlx::query(
r#"
INSERT OR REPLACE INTO schema_version (version, description)
VALUES (?, ?)
"#,
)
.bind(1)
.bind("Initial schema with ref_entries, ref_dependencies, and content_store tables")
.execute(&mut **tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
Ok(())
}
pub async fn store_reference(&self, reference: &Reference) -> Result<(), StoreError> {
// Use a transaction to ensure atomicity
let mut tx = self.pool.begin().await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Insert or update the reference
sqlx::query(
r#"
INSERT OR REPLACE INTO ref_entries (id, content_address, name)
VALUES (?, ?, ?)
"#,
)
.bind(&reference.id)
.bind(&reference.content_address)
.bind(&reference.name)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Delete existing dependencies for this reference
sqlx::query(
r#"
DELETE FROM ref_dependencies
WHERE parent_id = ?
"#,
)
.bind(&reference.id)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Insert new dependencies
for dependent in &reference.dependents {
sqlx::query(
r#"
INSERT INTO ref_dependencies (parent_id, dependent_id)
VALUES (?, ?)
"#,
)
.bind(&reference.id)
.bind(&dependent.id)
.execute(&mut *tx)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
}
// Commit the transaction
tx.commit().await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
Ok(())
}
pub async fn store_content(&self, content_address: &str, content: &[u8]) -> Result<(), StoreError> {
sqlx::query(
r#"
INSERT OR REPLACE INTO content_store (content_address, content)
VALUES (?, ?)
"#,
)
.bind(content_address)
.bind(content)
.execute(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
Ok(())
}
}
impl ReferenceStore for SqliteReferenceStore {
async fn get_reference(&self, id: &str) -> Result<Reference, StoreError> {
// First, get the basic reference information
let row = sqlx::query(
r#"
SELECT id, content_address, name
FROM ref_entries
WHERE id = ?
"#,
)
.bind(id)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
match row {
Some(row) => {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Get the dependents by recursively fetching them
let dependents = self.get_dependents(&id).await?;
Ok(Reference {
id,
content_address,
name,
dependents,
})
}
None => Err(StoreError::NoSuchReference),
}
}
async fn get_content_for_reference(&self, reference: Reference) -> Result<String, StoreError> {
if let Some(content_address) = &reference.content_address {
let row = sqlx::query(
r#"
SELECT content
FROM content_store
WHERE content_address = ?
"#,
)
.bind(content_address)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
match row {
Some(row) => {
let content: Vec<u8> = row.get("content");
String::from_utf8(content)
.map_err(|e| StoreError::StorageError(Box::new(e)))
}
None => Err(StoreError::NoSuchContentAddress),
}
} else {
Err(StoreError::NoSuchContentAddress)
}
}
async fn get_graph(&self, root_name: &str) -> Result<Vec<Reference>, StoreError> {
let mut visited = std::collections::HashSet::new();
let mut result = Vec::new();
let mut queue = std::collections::VecDeque::new();
// Start with the root name
queue.push_back(root_name.to_string());
while let Some(current_name) = queue.pop_front() {
if visited.contains(&current_name) {
continue;
}
visited.insert(current_name.clone());
// Get the reference by name
let row = sqlx::query(
r#"
SELECT id, content_address, name
FROM ref_entries
WHERE name = ?
"#,
)
.bind(&current_name)
.fetch_optional(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
if let Some(row) = row {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
// Get dependents for this reference
let dependents = self.get_dependents(&id).await?;
let reference = Reference {
id,
content_address,
name,
dependents: dependents.clone(),
};
result.push(reference);
// Add all dependent names to the queue for processing
for dependent in dependents {
if !visited.contains(&dependent.name) {
queue.push_back(dependent.name.clone());
}
}
}
}
Ok(result)
}
}
impl SqliteReferenceStore {
async fn get_dependents(&self, parent_id: &str) -> Result<Vec<Arc<Reference>>, StoreError> {
// Use a CTE (Common Table Expression) to get the entire dependency tree in one query
let rows = sqlx::query(
r#"
WITH RECURSIVE dependency_tree AS (
-- Base case: direct dependents of the parent
SELECT r.id, r.content_address, r.name, rd.parent_id, 0 as depth
FROM ref_entries r
JOIN ref_dependencies rd ON r.id = rd.dependent_id
WHERE rd.parent_id = ?
UNION ALL
-- Recursive case: dependents of dependents
SELECT r.id, r.content_address, r.name, rd.parent_id, dt.depth + 1
FROM ref_entries r
JOIN ref_dependencies rd ON r.id = rd.dependent_id
JOIN dependency_tree dt ON rd.parent_id = dt.id
WHERE dt.depth < 100 -- Prevent infinite recursion
)
SELECT id, content_address, name, parent_id, depth
FROM dependency_tree
ORDER BY depth, name
"#,
)
.bind(parent_id)
.fetch_all(&self.pool)
.await
.map_err(|e| StoreError::StorageError(Box::new(e)))?;
// Build the dependency tree iteratively
let mut reference_map: HashMap<String, Reference> = HashMap::new();
let mut children_map: HashMap<String, Vec<String>> = HashMap::new();
// First pass: create all references and build the children map
for row in &rows {
let id: String = row.get("id");
let content_address: Option<String> = row.get("content_address");
let name: String = row.get("name");
let parent_id: String = row.get("parent_id");
let reference = Reference {
id: id.clone(),
content_address,
name,
dependents: Vec::new(), // Will be filled in second pass
};
reference_map.insert(id.clone(), reference);
children_map.entry(parent_id).or_default().push(id);
}
// Second pass: build the dependency tree from bottom up (highest depth first)
let mut depth_groups: BTreeMap<i32, Vec<String>> = BTreeMap::new();
for row in &rows {
let id: String = row.get("id");
let depth: i32 = row.get("depth");
depth_groups.entry(depth).or_default().push(id);
}
// Process from highest depth to lowest (leaves to roots)
for (_depth, ids) in depth_groups.iter().rev() {
for id in ids {
if let Some(children) = children_map.get(id).cloned() {
let child_references: Vec<Arc<Reference>> = children
.iter()
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
.collect();
if let Some(reference) = reference_map.get_mut(id) {
reference.dependents = child_references;
}
}
}
}
// Return the direct children of the parent
let empty_vec = Vec::new();
let direct_children = children_map.get(parent_id).unwrap_or(&empty_vec);
let result = direct_children
.iter()
.filter_map(|child_id| reference_map.get(child_id).map(|r| Arc::new(r.clone())))
.collect();
Ok(result)
}
}

View File

@ -1,36 +0,0 @@
use std::sync::Arc;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct Reference {
pub object_id: String,
pub content_address: String,
pub path: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub dependents: Vec<Arc<Reference>>,
}
impl Reference {
pub fn new(object_id: String, content_address: String, path: String) -> Self {
Self {
object_id,
content_address,
path,
dependents: Vec::new(),
}
}
pub fn add_dep(mut self, dep: Arc<Reference>) -> Self {
self.dependents.push(dep);
self
}
pub fn to_arc(self) -> Arc<Self> {
Arc::new(self)
}
pub fn is_leaf(&self) -> bool {
return self.dependents.is_empty();
}
}

View File

@ -1,139 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use axum::{extract::Path, http, response::{Html, IntoResponse}, routing::get, Json, Router};
use blake2::{Blake2b512, Digest};
use rand::Rng;
use datamodel::Reference;
mod datamodel;
#[derive(Debug)]
pub struct AddressableObject {
pub address: String,
pub content: String,
}
fn random_object() -> AddressableObject {
let mut rng = rand::rng();
let random_size = rng.random_range(50..=4096);
let random_string: String = (0..random_size)
.map(|_| rng.sample(rand::distr::Alphanumeric) as char)
.collect();
let mut hasher = Blake2b512::new();
hasher.update(&random_string);
let hash = format!("{:x}", hasher.finalize());
AddressableObject {
address: hash,
content: random_string,
}
}
fn random_references_and_objects() -> (Arc<Reference>, Arc<HashMap<String, Arc<Reference>>>, Arc<HashMap<String, AddressableObject>>) {
let path_root = String::from("ref/0");
let mut objects = HashMap::new();
let mut refs = HashMap::new();
let mut root_ref = Reference::new(
"username:0".to_string(),
String::from("0"),
path_root.clone(),
);
let mut root_hasher = Blake2b512::new();
for i in 1..=10 {
let mut item_ref = Reference::new(
format!("item:{}", i),
format!("0:{}", i),
format!("/item/{}", i),
);
let mut hasher = Blake2b512::new();
for j in 1..=10 {
let object = random_object();
hasher.update(&object.content);
let leaf_ref = Reference::new(
format!("item:{}:subitem:{}", i, j),
format!("{}", object.address),
format!("/item/{}/subitem/{}", i, j),
).to_arc();
item_ref = item_ref.add_dep(leaf_ref.clone());
objects.insert(object.address.clone(), object);
hasher.update(&leaf_ref.content_address);
refs.insert(leaf_ref.path.clone(), leaf_ref);
}
let hash = format!("{:x}", hasher.finalize());
item_ref.content_address = hash;
root_hasher.update(&item_ref.content_address);
let rc_ref = item_ref.to_arc();
root_ref = root_ref.add_dep(rc_ref.clone());
refs.insert(rc_ref.path.clone(), rc_ref);
}
root_ref.content_address = format!("{:x}", root_hasher.finalize());
let rc_root = root_ref.to_arc();
refs.insert(rc_root.path.clone(), rc_root.clone());
dbg!(&objects);
(rc_root, Arc::new(refs), Arc::new(objects))
}
async fn all_references(root_ref: Arc<Reference>) -> Json<Arc<Reference>> {
Json(root_ref)
}
async fn ref_path(refs: Arc<HashMap<String, Arc<Reference>>>, Path(path): Path<String>) -> Json<Arc<Reference>> {
let path = format!("/item/{}", path);
match refs.get(&path) {
Some(r) => Json(r.clone()),
None => todo!("Return a 404?"),
}
}
async fn object_path(objects: Arc<HashMap<String, AddressableObject>>, Path(path): Path<String>) -> String {
dbg!(&path);
match objects.get(&path) {
Some(o) => o.content.clone(),
None => todo!("Return a 404?"),
}
}
async fn get_client_js() -> impl IntoResponse {
(
[(http::header::CONTENT_TYPE, "application/javascript")],
include_str!("../static/client.js"),
)
}
pub fn endpoints(root_ref: Arc<Reference>, refs: Arc<HashMap<String, Arc<Reference>>>, objects: Arc<HashMap<String, AddressableObject>>) -> Router {
Router::new().nest(
"/api/v1",
Router::new().nest(
"/ref",
Router::new()
.route("/all/username", get({
let state = root_ref.clone();
move || all_references(state)
}))
.route("/item/{*path}", get({
let refs = refs.clone();
move |path| ref_path(refs, path)
}))
).nest(
"/object",
Router::new()
.route("/{addr}", get({
let objects = objects.clone();
move |path| object_path(objects, path)
}))
),
)
.route("/lib/client.js", get(get_client_js))
.route("/ui/", get(|| async { Html(include_str!("../static/index.html")).into_response() }))
}
// TODO(jwall): Javascript test script
pub async fn serve() {
// run our app with hyper, listening globally on port 3000
let (root_ref, refs, objects) = random_references_and_objects();
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
axum::serve(listener, endpoints(root_ref, refs, objects)).await.unwrap();
}