Compare commits

..

3 Commits

6 changed files with 170 additions and 107 deletions

84
Cargo.lock generated
View File

@ -41,8 +41,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"axum-macros",
"axum-core 0.4.5",
"axum-macros 0.4.2",
"base64",
"bytes",
"futures-util",
@ -52,7 +52,7 @@ dependencies = [
"hyper",
"hyper-util",
"itoa",
"matchit",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
@ -72,6 +72,41 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288"
dependencies = [
"axum-core 0.5.2",
"axum-macros 0.5.0",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
@ -93,6 +128,26 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-macros"
version = "0.4.2"
@ -104,13 +159,24 @@ dependencies = [
"syn",
]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "axum-typed-websockets"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df537817c0dc8ddccc7a4d479ea5d23438baf6f7ea97c583ac2256a75ff742fd"
dependencies = [
"axum",
"axum 0.7.9",
"futures-util",
"serde",
"serde_json",
@ -415,6 +481,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "memchr"
version = "2.7.4"
@ -460,7 +532,7 @@ dependencies = [
name = "offline-web-http"
version = "0.1.0"
dependencies = [
"axum",
"axum 0.8.3",
"blake2",
"offline-web-model",
"rand 0.9.0",
@ -479,7 +551,7 @@ dependencies = [
name = "offline-web-ws"
version = "0.1.0"
dependencies = [
"axum",
"axum 0.7.9",
"axum-typed-websockets",
"blake2",
"offline-web-model",

View File

@ -2,22 +2,20 @@
## Synchronization
We assume that our resources map easily to REST concepts.
We support several operations for Bootstrapping
- Resources map to a path.
- Groups of resources are represented in the path structure.
- `HEAD` to get the header to check whether we need to sync.
- `POST` is create
- `PUT` is mutate
- `GET` retrieves the resource
- `DELETE` removes the resources
* Fetch Bootstrap
* Fetch Reference
* Fetch Object
## Resource Query Datamodel
## Resource Datamodel
We assume all resources are content-addressable and form a merkle tree. We
maintain an index of path to content-addressable items.
Resource reference paths are rooted at `/api/v1/ref/<path>`.
Resource reference paths are rooted at a `/ref/` prefix for their path
namespace. They contain a payload with the `objectID`, `content_address` if
there is one, and a list of any dependent resources.
```json
{
@ -39,82 +37,32 @@ Resource reference paths are rooted at `/api/v1/ref/<path>`.
}
```
Resource references contain a payload with the objectID and a list of any
dependent resources. They support the following operations
* `GET`
* `POST`
* `PUT`
* `DELETE`
* `HEAD`
`HEAD` requests return only the objectId in the payload. Primarily used to detect
if a reference needs to be updated on the client or not.
Resource Requests can be expanded to their object reference payloads at any
configurable depth with the query parameters `?depth=<number>` or to the full
tree depth with `?full=true`. This will add recursive dependent sections to the
content key.
```json
{
"objectId": <merkle-hash>,
"path": "/path/name0",
"dependents": [
{
"objectId": <merkle-hash>,
"path": "path/name1",
"content": {
"objectId": <merkle-hash>,
"dependents": [
{
"path": "path/name1",
"content_address": <content-hash>,
"objectId": <merkle-hash>, content: <payload>
},
{
"path": "path/name2",
"content_address": <content-hash>,
"objectId": <merkle-hash>, content: <payload>
}
],
}
}
]
}
```
### Reserved References
* `/api/v1/ref/all/<username>` The root of the resouce tree. List all sub
* `/ref/all/<username>` The root of the resouce tree. List all sub
resources that the user has access too.
* `/api/v1/ref/user/<username>` The user information.
* `/ref/user/<username>` The user information.
### Content-Addressable Query API
The content addressable store is considered immutable. You do not delete from
it. We may garbage collect at some point as a storage optimization.
Content addressable paths are rooted at `/api/v1/object/<content-hash>`.
Content addressable paths are rooted at `/object/<content-hash>`.
Their payloads are whatever the contents serialize to in json and support the following
operations.
* `GET`
* `POST`
Their payloads are whatever the contents serialize to in json
## Syncrhonization
### Bootstrapping
* Load `/api/v1/ref/all/<username>` and then follow the sub resources to
load the entire dataset locally making sure to keep the content-addresses
around for comparison.
* Load `/ref/all/<username>`
* Follow the sub resources recusively to load the entire dataset locally making
sure to keep the content-addresses around for comparison.
# Benchmarking
## Bootstrapping benchmark tests
* Server side loading
* Client side loading
* WebSocket?
* Rest API
* WebSockets

View File

@ -8,7 +8,7 @@ name = "exp1"
path = "src/main.rs"
[dependencies]
axum = { version = "0.7.4", features = ["macros"] }
axum = { version = "0.8.0", features = ["macros"] }
blake2 = "0.10.6"
rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }

View File

@ -86,9 +86,9 @@ async fn ref_path(refs: Arc<HashMap<String, Arc<Reference>>>, Path(path): Path<S
}
}
async fn object_path(objects: Arc<HashMap<String, AddressableObject>>, Path(path): Path<String>) -> String {
dbg!(&path);
match objects.get(&path) {
async fn object_path(objects: Arc<HashMap<String, AddressableObject>>, Path(addr): Path<String>) -> String {
dbg!(&addr);
match objects.get(&addr) {
Some(o) => o.content.clone(),
None => todo!("Return a 404?"),
}
@ -120,8 +120,8 @@ pub fn endpoints(root_ref: Arc<Reference>, refs: Arc<HashMap<String, Arc<Referen
Router::new()
.route("/{addr}", get({
let objects = objects.clone();
move |path| object_path(objects, path)
}))
move |addr| object_path(objects, addr)
}))
),
)
.route("/lib/client.js", get(get_client_js))
@ -135,5 +135,6 @@ pub async fn serve() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
println!("Server ui starting on http://127.0.0.1:3000/ui/");
axum::serve(listener, endpoints(root_ref, refs, objects)).await.unwrap();
}

View File

@ -133,7 +133,7 @@ async fn handle_socket(
)))
.await
{
eprintln!("Error sending bootstrap reference: {:?}", e);
println!("Error sending bootstrap reference: {:?}", e);
continue;
}
}
@ -143,11 +143,11 @@ async fn handle_socket(
.send(Message::Item(ServerMsg::Reference((**reference).clone())))
.await
{
eprintln!("Error sending reference: {:?}", e);
println!("Error sending reference: {:?}", e);
continue;
}
} else {
eprintln!("Reference not found: {}", path);
println!("Reference not found: {}", path);
}
}
Message::Item(ClientMsg::GetObject(address)) => {
@ -156,21 +156,21 @@ async fn handle_socket(
.send(Message::Item(ServerMsg::Object(object.content.clone())))
.await
{
eprintln!("Error sending object: {:?}", e);
println!("Error sending object: {:?}", e);
continue;
}
} else {
eprintln!("Object not found: {}", address);
println!("Object not found: {}", address);
}
}
Message::Ping(items) => {
eprintln!("unhandled ping msg: {:?}", items);
println!("unhandled ping msg: {:?}", items);
}
Message::Pong(items) => {
eprintln!("unhandled pong msg: {:?}", items);
println!("unhandled pong msg: {:?}", items);
}
Message::Close(_close_frame) => {
eprintln!("closing websocket connection at client request");
println!("closing websocket connection at client request");
break;
}
}
@ -206,11 +206,11 @@ pub fn endpoints(
pub async fn serve() {
// run our app with hyper, listening globally on port 3000
let (root_ref, refs, objects) = random_references_and_objects();
println!("Server starting on http://127.0.0.1:3000");
println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws");
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
println!("Server ui starting on http://127.0.0.1:3000/ui/");
println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws");
axum::serve(listener, endpoints(root_ref, refs, objects))
.await
.unwrap();

View File

@ -8,12 +8,44 @@ export { bootstrap };
* @property {string} content_address
*/
async function load_bootstrap() {
let response = await fetch("/api/v1/ref/all/username");
if (!response.ok) {
throw new Error("Network response was not ok: " + response.statusText);
}
return await response.json();
/**
* @typedef {Object} ServerMsg
* @property {Reference?} Reference
* @property {string?} Object
*/
/**
* @param {WebSocket} socket
* @returns {Promise<ServerMsg>}
*/
async function load_bootstrap(socket) {
// Wait for the connection to be established
const data = await send_socket_msg(socket,
JSON.stringify("Bootstrap"));
return data;
}
/**
* @param {WebSocket} socket
* @param {string} msg
* @returns {Promise<ServerMsg>}
*/
async function send_socket_msg(socket, msg) {
// Send a request for all references
socket.send(msg);
// Wait for the response
/** @type {Promise<String>} */
const stream = await new Promise((resolve, reject) => {
socket.onmessage = (event) => {
resolve(event.data.text());
};
socket.onerror = (_error) => reject(new Error("WebSocket error occurred"));
});
let data = await stream;
return JSON.parse(data);
}
/**
@ -72,7 +104,7 @@ function storeObject(store, reference, root_path) {
* @returns {Promise<Array<Reference>>} An array of references
*/
function load_reference_paths(refStore, reference) {
return new Promise(async (resolve, reject) => {
return new Promise(async (resolve, _reject) => {
let references = [];
references.push(reference);
if (reference.dependents) {
@ -87,23 +119,24 @@ function load_reference_paths(refStore, reference) {
}
/**
* @param {WebSocket} socket
* @param {IDBDatabase} db
* @param {string} storeName
* @param {Array<Reference>} references
*/
async function load_objects_and_store(db, references, storeName) {
async function load_objects_and_store(socket, db, references, storeName) {
let objects = []
for (var ref of references) {
/** @type {Response} */
if (ref.dependents && ref.dependents.length != 0) {
continue; // not a leaf object
}
let response = await fetch("/api/v1/object/" + ref.content_address);
if (!response.ok) {
throw new Error("Network response was not ok: " + response.statusText);
let data = await send_socket_msg(socket, JSON.stringify({ "GetObject": ref.content_address }));
if (!data.Object) {
throw { error: "Not an object" };
}
const object = await response.text();
objects.push({ id: ref.content_address, content: object });
objects.push({ id: ref.content_address, content: data.Object });
}
const objectTrxAndStore = await getStoreAndTransaction(db, storeName);
for (var obj of objects) {
@ -133,7 +166,16 @@ async function bootstrap() {
const objectStoreName = "objects";
const databaseName = "MerkleStore";
const start = new Date().getTime();
const root = await load_bootstrap();
const socket = new WebSocket(`ws://${window.location.host}/api/v1/ws`);
await new Promise((resolve, reject) => {
socket.onopen = () => resolve();
socket.onerror = (error) => reject(new Error("WebSocket connection failed" + error));
});
const data = await load_bootstrap(socket);
if (!data.Reference) {
throw { error: "Not a Reference" };
}
const db = await openDatabase(databaseName, [refStoreName, objectStoreName]);
const refTrxAndStore = await getStoreAndTransaction(db, refStoreName);
@ -143,12 +185,12 @@ async function bootstrap() {
refTrxAndStore.trx.onerror = (event) => reject(event.target.error);
});
const refs = await load_reference_paths(refTrxAndStore.store, root);
const refs = await load_reference_paths(refTrxAndStore.store, data.Reference);
// Wait for the transaction to complete
await transactionComplete;
await load_objects_and_store(db, refs, objectStoreName);
await load_objects_and_store(socket, db, refs, objectStoreName);
const end = new Date().getTime();
return end - start;