From a216275eb5f5e394b7e29f94fe645d0c84a8cf7a Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Sat, 12 Apr 2025 17:09:28 -0400 Subject: [PATCH] wip: the beginnings of a websocket protocol --- Cargo.lock | 123 +++++++++++++++++++----- exp1/Cargo.toml | 2 +- exp2/Cargo.toml | 4 +- exp2/src/serve.rs | 179 +++++++++++++++++++++++++---------- offline-web-model/src/lib.rs | 2 +- 5 files changed, 231 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4053663..95317e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -25,15 +36,15 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "axum" -version = "0.8.3" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ + "async-trait", "axum-core", "axum-macros", "base64", "bytes", - "form_urlencoded", "futures-util", "http", "http-body", @@ -63,12 +74,13 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.5.2" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" dependencies = [ + "async-trait", "bytes", - "futures-core", + "futures-util", "http", "http-body", "http-body-util", @@ -83,15 +95,27 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.5.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" dependencies = [ "proc-macro2", "quote", "syn", ] +[[package]] +name = "axum-typed-websockets" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df537817c0dc8ddccc7a4d479ea5d23438baf6f7ea97c583ac2256a75ff742fd" +dependencies = [ + "axum", + "futures-util", + "serde", + "serde_json", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -137,6 +161,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -251,6 +281,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.3.2" @@ -370,9 +411,9 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "matchit" -version = "0.8.4" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "memchr" @@ -422,7 +463,7 @@ dependencies = [ "axum", "blake2", "offline-web-model", - "rand", + "rand 0.9.0", "serde", "tokio", ] @@ -439,9 +480,10 @@ name = "offline-web-ws" version = "0.1.0" dependencies = [ "axum", + "axum-typed-websockets", "blake2", "offline-web-model", - "rand", + "rand 0.9.0", "serde", "tokio", ] @@ -503,17 +545,38 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", "zerocopy", ] +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + [[package]] name = "rand_chacha" version = "0.9.0" @@ -521,7 +584,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.15", ] [[package]] @@ -530,7 +602,7 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" dependencies = [ - "getrandom", + "getrandom 0.3.2", ] [[package]] @@ -666,18 +738,18 @@ checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" [[package]] name = "thiserror" -version = "2.0.12" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.12" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", @@ -713,9 +785,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.26.2" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" dependencies = [ "futures-util", "log", @@ -773,16 +845,17 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.26.2" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" dependencies = [ + "byteorder", "bytes", "data-encoding", "http", "httparse", "log", - "rand", + "rand 0.8.5", "sha1", "thiserror", "utf-8", diff --git a/exp1/Cargo.toml b/exp1/Cargo.toml index 9429eac..284f196 100644 --- a/exp1/Cargo.toml +++ b/exp1/Cargo.toml @@ -8,7 +8,7 @@ name = "exp1" path = "src/main.rs" [dependencies] -axum = { version = "0.8.3", features = ["macros"] } +axum = { version = "0.7.4", features = ["macros"] } blake2 = "0.10.6" rand = "0.9.0" serde = { version = "1.0.219", features = ["derive", "rc"] } diff --git a/exp2/Cargo.toml b/exp2/Cargo.toml index 4ee43dc..db0dc17 100644 --- a/exp2/Cargo.toml +++ b/exp2/Cargo.toml @@ -8,9 +8,11 @@ name = "exp2" path = "src/main.rs" [dependencies] -axum = { version = "0.8.3", features = ["macros", "ws"] } +# NOTE(zaphar): we depend on this version of axum for axum-typed-websockets to work +axum = { version = "0.7.4", features = ["macros", "ws"] } blake2 = "0.10.6" rand = "0.9.0" serde = { version = "1.0.219", features = ["derive", "rc"] } tokio = "1.44.1" offline-web-model = { path = "../offline-web-model" } +axum-typed-websockets = "0.6.0" diff --git a/exp2/src/serve.rs b/exp2/src/serve.rs index 4380f59..49aae84 100644 --- a/exp2/src/serve.rs +++ b/exp2/src/serve.rs @@ -1,8 +1,15 @@ use std::{collections::HashMap, sync::Arc}; -use axum::{extract::Path, http, response::{Html, IntoResponse}, routing::get, Json, Router}; +use axum::{ + http, + response::{Html, IntoResponse}, + routing::get, + Router, +}; +use axum_typed_websockets::{Message, WebSocket, WebSocketUpgrade}; use blake2::{Blake2b512, Digest}; use rand::Rng; +use serde::{Deserialize, Serialize}; use offline_web_model::Reference; @@ -29,7 +36,11 @@ fn random_object() -> AddressableObject { } } -fn random_references_and_objects() -> (Arc, Arc>>, Arc>) { +fn random_references_and_objects() -> ( + Arc, + Arc>>, + Arc>, +) { let path_root = String::from("ref/0"); let mut objects = HashMap::new(); let mut refs = HashMap::new(); @@ -53,7 +64,8 @@ fn random_references_and_objects() -> (Arc, Arc (Arc, Arc) -> Json> { - Json(root_ref) -} - -async fn ref_path(refs: Arc>>, Path(path): Path) -> Json> { - let path = format!("/item/{}", path); - match refs.get(&path) { - Some(r) => Json(r.clone()), - None => todo!("Return a 404?"), - } -} - -async fn object_path(objects: Arc>, Path(path): Path) -> String { - dbg!(&path); - match objects.get(&path) { - Some(o) => o.content.clone(), - None => todo!("Return a 404?"), - } -} - async fn get_client_js() -> impl IntoResponse { ( [(http::header::CONTENT_TYPE, "application/javascript")], @@ -101,40 +92,126 @@ async fn get_client_js() -> impl IntoResponse { ) } -pub fn endpoints(root_ref: Arc, refs: Arc>>, objects: Arc>) -> Router { - // TODO(zaphar): use websockets instead - Router::new().nest( - "/api/v1", - Router::new().nest( - "/ref", - Router::new() - .route("/all/username", get({ - let state = root_ref.clone(); - move || all_references(state) - })) - .route("/item/{*path}", get({ - let refs = refs.clone(); - move |path| ref_path(refs, path) - })) - ).nest( - "/object", - Router::new() - .route("/{addr}", get({ - let objects = objects.clone(); - move |path| object_path(objects, path) - })) - ), - ) - .route("/lib/client.js", get(get_client_js)) - .route("/ui/", get(|| async { Html(include_str!("../static/index.html")).into_response() })) +#[derive(Debug, Serialize, Deserialize)] +enum ServerMsg { + Reference(Reference), + Object(String), +} + +#[derive(Debug, Serialize, Deserialize)] +enum ClientMsg { + Bootstrap, + GetReference(String), + GetObject(String), +} + +async fn handle_websocket( + ws: WebSocketUpgrade, + root_ref: Arc, + refs: Arc>>, + objects: Arc>, +) -> impl IntoResponse { + ws.on_upgrade(|socket| handle_socket(socket, root_ref, refs, objects)) +} + +async fn handle_socket( + mut socket: WebSocket, + root_ref: Arc, + refs: Arc>>, + objects: Arc>, +) { + // Send initial data to the client + + // Handle incoming messages + // TODO(zaphar): This should actually be smarter in the case of None or Err. + while let Some(Ok(_msg)) = socket.recv().await { + match _msg { + Message::Item(ClientMsg::Bootstrap) => { + if let Err(e) = socket + .send(Message::Item(ServerMsg::Reference( + root_ref.as_ref().clone(), + ))) + .await + { + eprintln!("Error sending bootstrap reference: {:?}", e); + continue; + } + } + Message::Item(ClientMsg::GetReference(path)) => { + if let Some(reference) = refs.get(&path) { + if let Err(e) = socket + .send(Message::Item(ServerMsg::Reference((**reference).clone()))) + .await + { + eprintln!("Error sending reference: {:?}", e); + continue; + } + } else { + eprintln!("Reference not found: {}", path); + } + } + Message::Item(ClientMsg::GetObject(address)) => { + if let Some(object) = objects.get(&address) { + if let Err(e) = socket + .send(Message::Item(ServerMsg::Object(object.content.clone()))) + .await + { + eprintln!("Error sending object: {:?}", e); + continue; + } + } else { + eprintln!("Object not found: {}", address); + } + } + Message::Ping(items) => { + eprintln!("unhandled ping msg: {:?}", items); + } + Message::Pong(items) => { + eprintln!("unhandled pong msg: {:?}", items); + } + Message::Close(_close_frame) => { + eprintln!("closing websocket connection at client request"); + break; + } + } + } + + println!("WebSocket connection closed"); +} + +pub fn endpoints( + root_ref: Arc, + refs: Arc>>, + objects: Arc>, +) -> Router { + // WebSocket route now implemented + Router::new() + .nest( + "/api/v1", + Router::new().route( + "/ws", + get(|ws: WebSocketUpgrade| async move { + handle_websocket(ws, root_ref.clone(), refs.clone(), objects.clone()).await + }), + ), + ) + .route("/lib/client.js", get(get_client_js)) + .route( + "/ui/", + get(|| async { Html(include_str!("../static/index.html")).into_response() }), + ) } // TODO(jwall): Javascript test script pub async fn serve() { // run our app with hyper, listening globally on port 3000 let (root_ref, refs, objects) = random_references_and_objects(); + println!("Server starting on http://127.0.0.1:3000"); + println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws"); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await .unwrap(); - axum::serve(listener, endpoints(root_ref, refs, objects)).await.unwrap(); + axum::serve(listener, endpoints(root_ref, refs, objects)) + .await + .unwrap(); } diff --git a/offline-web-model/src/lib.rs b/offline-web-model/src/lib.rs index ab22961..d224e86 100644 --- a/offline-web-model/src/lib.rs +++ b/offline-web-model/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use serde::{Serialize, Deserialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Reference { pub object_id: String, pub content_address: String,