Compare commits

...

3 Commits

6 changed files with 170 additions and 107 deletions

84
Cargo.lock generated
View File

@ -41,8 +41,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum-core", "axum-core 0.4.5",
"axum-macros", "axum-macros 0.4.2",
"base64", "base64",
"bytes", "bytes",
"futures-util", "futures-util",
@ -52,7 +52,7 @@ dependencies = [
"hyper", "hyper",
"hyper-util", "hyper-util",
"itoa", "itoa",
"matchit", "matchit 0.7.3",
"memchr", "memchr",
"mime", "mime",
"percent-encoding", "percent-encoding",
@ -72,6 +72,41 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "axum"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288"
dependencies = [
"axum-core 0.5.2",
"axum-macros 0.5.0",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "axum-core" name = "axum-core"
version = "0.4.5" version = "0.4.5"
@ -93,6 +128,26 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "axum-core"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "axum-macros" name = "axum-macros"
version = "0.4.2" version = "0.4.2"
@ -104,13 +159,24 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "axum-typed-websockets" name = "axum-typed-websockets"
version = "0.6.0" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df537817c0dc8ddccc7a4d479ea5d23438baf6f7ea97c583ac2256a75ff742fd" checksum = "df537817c0dc8ddccc7a4d479ea5d23438baf6f7ea97c583ac2256a75ff742fd"
dependencies = [ dependencies = [
"axum", "axum 0.7.9",
"futures-util", "futures-util",
"serde", "serde",
"serde_json", "serde_json",
@ -415,6 +481,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.4" version = "2.7.4"
@ -460,7 +532,7 @@ dependencies = [
name = "offline-web-http" name = "offline-web-http"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum 0.8.3",
"blake2", "blake2",
"offline-web-model", "offline-web-model",
"rand 0.9.0", "rand 0.9.0",
@ -479,7 +551,7 @@ dependencies = [
name = "offline-web-ws" name = "offline-web-ws"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum 0.7.9",
"axum-typed-websockets", "axum-typed-websockets",
"blake2", "blake2",
"offline-web-model", "offline-web-model",

View File

@ -2,22 +2,20 @@
## Synchronization ## Synchronization
We assume that our resources map easily to REST concepts. We support several operations for Bootstrapping
- Resources map to a path. * Fetch Bootstrap
- Groups of resources are represented in the path structure. * Fetch Reference
- `HEAD` to get the header to check whether we need to sync. * Fetch Object
- `POST` is create
- `PUT` is mutate
- `GET` retrieves the resource
- `DELETE` removes the resources
## Resource Query Datamodel ## Resource Datamodel
We assume all resources are content-addressable and form a merkle tree. We We assume all resources are content-addressable and form a merkle tree. We
maintain an index of path to content-addressable items. maintain an index of path to content-addressable items.
Resource reference paths are rooted at `/api/v1/ref/<path>`. Resource reference paths are rooted at a `/ref/` prefix for their path
namespace. They contain a payload with the `objectID`, `content_address` if
there is one, and a list of any dependent resources.
```json ```json
{ {
@ -39,82 +37,32 @@ Resource reference paths are rooted at `/api/v1/ref/<path>`.
} }
``` ```
Resource references contain a payload with the objectID and a list of any
dependent resources. They support the following operations
* `GET`
* `POST`
* `PUT`
* `DELETE`
* `HEAD`
`HEAD` requests return only the objectId in the payload. Primarily used to detect
if a reference needs to be updated on the client or not.
Resource Requests can be expanded to their object reference payloads at any
configurable depth with the query parameters `?depth=<number>` or to the full
tree depth with `?full=true`. This will add recursive dependent sections to the
content key.
```json
{
"objectId": <merkle-hash>,
"path": "/path/name0",
"dependents": [
{
"objectId": <merkle-hash>,
"path": "path/name1",
"content": {
"objectId": <merkle-hash>,
"dependents": [
{
"path": "path/name1",
"content_address": <content-hash>,
"objectId": <merkle-hash>, content: <payload>
},
{
"path": "path/name2",
"content_address": <content-hash>,
"objectId": <merkle-hash>, content: <payload>
}
],
}
}
]
}
```
### Reserved References ### Reserved References
* `/api/v1/ref/all/<username>` The root of the resouce tree. List all sub * `/ref/all/<username>` The root of the resouce tree. List all sub
resources that the user has access too. resources that the user has access too.
* `/api/v1/ref/user/<username>` The user information. * `/ref/user/<username>` The user information.
### Content-Addressable Query API ### Content-Addressable Query API
The content addressable store is considered immutable. You do not delete from The content addressable store is considered immutable. You do not delete from
it. We may garbage collect at some point as a storage optimization. it. We may garbage collect at some point as a storage optimization.
Content addressable paths are rooted at `/api/v1/object/<content-hash>`. Content addressable paths are rooted at `/object/<content-hash>`.
Their payloads are whatever the contents serialize to in json and support the following Their payloads are whatever the contents serialize to in json
operations.
* `GET`
* `POST`
## Syncrhonization ## Syncrhonization
### Bootstrapping ### Bootstrapping
* Load `/api/v1/ref/all/<username>` and then follow the sub resources to * Load `/ref/all/<username>`
load the entire dataset locally making sure to keep the content-addresses * Follow the sub resources recusively to load the entire dataset locally making
around for comparison. sure to keep the content-addresses around for comparison.
# Benchmarking # Benchmarking
## Bootstrapping benchmark tests ## Bootstrapping benchmark tests
* Server side loading * Rest API
* Client side loading * WebSockets
* WebSocket?

View File

@ -8,7 +8,7 @@ name = "exp1"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
axum = { version = "0.7.4", features = ["macros"] } axum = { version = "0.8.0", features = ["macros"] }
blake2 = "0.10.6" blake2 = "0.10.6"
rand = "0.9.0" rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] } serde = { version = "1.0.219", features = ["derive", "rc"] }

View File

@ -86,9 +86,9 @@ async fn ref_path(refs: Arc<HashMap<String, Arc<Reference>>>, Path(path): Path<S
} }
} }
async fn object_path(objects: Arc<HashMap<String, AddressableObject>>, Path(path): Path<String>) -> String { async fn object_path(objects: Arc<HashMap<String, AddressableObject>>, Path(addr): Path<String>) -> String {
dbg!(&path); dbg!(&addr);
match objects.get(&path) { match objects.get(&addr) {
Some(o) => o.content.clone(), Some(o) => o.content.clone(),
None => todo!("Return a 404?"), None => todo!("Return a 404?"),
} }
@ -120,7 +120,7 @@ pub fn endpoints(root_ref: Arc<Reference>, refs: Arc<HashMap<String, Arc<Referen
Router::new() Router::new()
.route("/{addr}", get({ .route("/{addr}", get({
let objects = objects.clone(); let objects = objects.clone();
move |path| object_path(objects, path) move |addr| object_path(objects, addr)
})) }))
), ),
) )
@ -135,5 +135,6 @@ pub async fn serve() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await .await
.unwrap(); .unwrap();
println!("Server ui starting on http://127.0.0.1:3000/ui/");
axum::serve(listener, endpoints(root_ref, refs, objects)).await.unwrap(); axum::serve(listener, endpoints(root_ref, refs, objects)).await.unwrap();
} }

View File

@ -133,7 +133,7 @@ async fn handle_socket(
))) )))
.await .await
{ {
eprintln!("Error sending bootstrap reference: {:?}", e); println!("Error sending bootstrap reference: {:?}", e);
continue; continue;
} }
} }
@ -143,11 +143,11 @@ async fn handle_socket(
.send(Message::Item(ServerMsg::Reference((**reference).clone()))) .send(Message::Item(ServerMsg::Reference((**reference).clone())))
.await .await
{ {
eprintln!("Error sending reference: {:?}", e); println!("Error sending reference: {:?}", e);
continue; continue;
} }
} else { } else {
eprintln!("Reference not found: {}", path); println!("Reference not found: {}", path);
} }
} }
Message::Item(ClientMsg::GetObject(address)) => { Message::Item(ClientMsg::GetObject(address)) => {
@ -156,21 +156,21 @@ async fn handle_socket(
.send(Message::Item(ServerMsg::Object(object.content.clone()))) .send(Message::Item(ServerMsg::Object(object.content.clone())))
.await .await
{ {
eprintln!("Error sending object: {:?}", e); println!("Error sending object: {:?}", e);
continue; continue;
} }
} else { } else {
eprintln!("Object not found: {}", address); println!("Object not found: {}", address);
} }
} }
Message::Ping(items) => { Message::Ping(items) => {
eprintln!("unhandled ping msg: {:?}", items); println!("unhandled ping msg: {:?}", items);
} }
Message::Pong(items) => { Message::Pong(items) => {
eprintln!("unhandled pong msg: {:?}", items); println!("unhandled pong msg: {:?}", items);
} }
Message::Close(_close_frame) => { Message::Close(_close_frame) => {
eprintln!("closing websocket connection at client request"); println!("closing websocket connection at client request");
break; break;
} }
} }
@ -206,11 +206,11 @@ pub fn endpoints(
pub async fn serve() { pub async fn serve() {
// run our app with hyper, listening globally on port 3000 // run our app with hyper, listening globally on port 3000
let (root_ref, refs, objects) = random_references_and_objects(); let (root_ref, refs, objects) = random_references_and_objects();
println!("Server starting on http://127.0.0.1:3000");
println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws");
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await .await
.unwrap(); .unwrap();
println!("Server ui starting on http://127.0.0.1:3000/ui/");
println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws");
axum::serve(listener, endpoints(root_ref, refs, objects)) axum::serve(listener, endpoints(root_ref, refs, objects))
.await .await
.unwrap(); .unwrap();

View File

@ -8,12 +8,44 @@ export { bootstrap };
* @property {string} content_address * @property {string} content_address
*/ */
async function load_bootstrap() { /**
let response = await fetch("/api/v1/ref/all/username"); * @typedef {Object} ServerMsg
if (!response.ok) { * @property {Reference?} Reference
throw new Error("Network response was not ok: " + response.statusText); * @property {string?} Object
} */
return await response.json();
/**
* @param {WebSocket} socket
* @returns {Promise<ServerMsg>}
*/
async function load_bootstrap(socket) {
// Wait for the connection to be established
const data = await send_socket_msg(socket,
JSON.stringify("Bootstrap"));
return data;
}
/**
* @param {WebSocket} socket
* @param {string} msg
* @returns {Promise<ServerMsg>}
*/
async function send_socket_msg(socket, msg) {
// Send a request for all references
socket.send(msg);
// Wait for the response
/** @type {Promise<String>} */
const stream = await new Promise((resolve, reject) => {
socket.onmessage = (event) => {
resolve(event.data.text());
};
socket.onerror = (_error) => reject(new Error("WebSocket error occurred"));
});
let data = await stream;
return JSON.parse(data);
} }
/** /**
@ -72,7 +104,7 @@ function storeObject(store, reference, root_path) {
* @returns {Promise<Array<Reference>>} An array of references * @returns {Promise<Array<Reference>>} An array of references
*/ */
function load_reference_paths(refStore, reference) { function load_reference_paths(refStore, reference) {
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, _reject) => {
let references = []; let references = [];
references.push(reference); references.push(reference);
if (reference.dependents) { if (reference.dependents) {
@ -87,23 +119,24 @@ function load_reference_paths(refStore, reference) {
} }
/** /**
* @param {WebSocket} socket
* @param {IDBDatabase} db * @param {IDBDatabase} db
* @param {string} storeName * @param {string} storeName
* @param {Array<Reference>} references * @param {Array<Reference>} references
*/ */
async function load_objects_and_store(db, references, storeName) { async function load_objects_and_store(socket, db, references, storeName) {
let objects = [] let objects = []
for (var ref of references) { for (var ref of references) {
/** @type {Response} */ /** @type {Response} */
if (ref.dependents && ref.dependents.length != 0) { if (ref.dependents && ref.dependents.length != 0) {
continue; // not a leaf object continue; // not a leaf object
} }
let response = await fetch("/api/v1/object/" + ref.content_address);
if (!response.ok) { let data = await send_socket_msg(socket, JSON.stringify({ "GetObject": ref.content_address }));
throw new Error("Network response was not ok: " + response.statusText); if (!data.Object) {
throw { error: "Not an object" };
} }
const object = await response.text(); objects.push({ id: ref.content_address, content: data.Object });
objects.push({ id: ref.content_address, content: object });
} }
const objectTrxAndStore = await getStoreAndTransaction(db, storeName); const objectTrxAndStore = await getStoreAndTransaction(db, storeName);
for (var obj of objects) { for (var obj of objects) {
@ -133,7 +166,16 @@ async function bootstrap() {
const objectStoreName = "objects"; const objectStoreName = "objects";
const databaseName = "MerkleStore"; const databaseName = "MerkleStore";
const start = new Date().getTime(); const start = new Date().getTime();
const root = await load_bootstrap(); const socket = new WebSocket(`ws://${window.location.host}/api/v1/ws`);
await new Promise((resolve, reject) => {
socket.onopen = () => resolve();
socket.onerror = (error) => reject(new Error("WebSocket connection failed" + error));
});
const data = await load_bootstrap(socket);
if (!data.Reference) {
throw { error: "Not a Reference" };
}
const db = await openDatabase(databaseName, [refStoreName, objectStoreName]); const db = await openDatabase(databaseName, [refStoreName, objectStoreName]);
const refTrxAndStore = await getStoreAndTransaction(db, refStoreName); const refTrxAndStore = await getStoreAndTransaction(db, refStoreName);
@ -143,12 +185,12 @@ async function bootstrap() {
refTrxAndStore.trx.onerror = (event) => reject(event.target.error); refTrxAndStore.trx.onerror = (event) => reject(event.target.error);
}); });
const refs = await load_reference_paths(refTrxAndStore.store, root); const refs = await load_reference_paths(refTrxAndStore.store, data.Reference);
// Wait for the transaction to complete // Wait for the transaction to complete
await transactionComplete; await transactionComplete;
await load_objects_and_store(db, refs, objectStoreName); await load_objects_and_store(socket, db, refs, objectStoreName);
const end = new Date().getTime(); const end = new Date().getTime();
return end - start; return end - start;