wip: the beginnings of a websocket protocol

This commit is contained in:
Jeremy Wall 2025-04-12 17:09:28 -04:00
parent 350479bb18
commit a216275eb5
5 changed files with 231 additions and 79 deletions

123
Cargo.lock generated
View File

@ -17,6 +17,17 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "async-trait"
version = "0.1.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.4.0"
@ -25,15 +36,15 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "axum"
version = "0.8.3"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"axum-macros",
"base64",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
@ -63,12 +74,13 @@ dependencies = [
[[package]]
name = "axum-core"
version = "0.5.2"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-body-util",
@ -83,15 +95,27 @@ dependencies = [
[[package]]
name = "axum-macros"
version = "0.5.0"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "axum-typed-websockets"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df537817c0dc8ddccc7a4d479ea5d23438baf6f7ea97c583ac2256a75ff742fd"
dependencies = [
"axum",
"futures-util",
"serde",
"serde_json",
]
[[package]]
name = "backtrace"
version = "0.3.74"
@ -137,6 +161,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
@ -251,6 +281,17 @@ dependencies = [
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.3.2"
@ -370,9 +411,9 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "matchit"
version = "0.8.4"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "memchr"
@ -422,7 +463,7 @@ dependencies = [
"axum",
"blake2",
"offline-web-model",
"rand",
"rand 0.9.0",
"serde",
"tokio",
]
@ -439,9 +480,10 @@ name = "offline-web-ws"
version = "0.1.0"
dependencies = [
"axum",
"axum-typed-websockets",
"blake2",
"offline-web-model",
"rand",
"rand 0.9.0",
"serde",
"tokio",
]
@ -503,17 +545,38 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha",
"rand_core",
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"zerocopy",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
@ -521,7 +584,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.9.3",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.15",
]
[[package]]
@ -530,7 +602,7 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom",
"getrandom 0.3.2",
]
[[package]]
@ -666,18 +738,18 @@ checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]]
name = "thiserror"
version = "2.0.12"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.12"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
@ -713,9 +785,9 @@ dependencies = [
[[package]]
name = "tokio-tungstenite"
version = "0.26.2"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084"
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
dependencies = [
"futures-util",
"log",
@ -773,16 +845,17 @@ dependencies = [
[[package]]
name = "tungstenite"
version = "0.26.2"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13"
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand",
"rand 0.8.5",
"sha1",
"thiserror",
"utf-8",

View File

@ -8,7 +8,7 @@ name = "exp1"
path = "src/main.rs"
[dependencies]
axum = { version = "0.8.3", features = ["macros"] }
axum = { version = "0.7.4", features = ["macros"] }
blake2 = "0.10.6"
rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }

View File

@ -8,9 +8,11 @@ name = "exp2"
path = "src/main.rs"
[dependencies]
axum = { version = "0.8.3", features = ["macros", "ws"] }
# NOTE(zaphar): we depend on this version of axum for axum-typed-websockets to work
axum = { version = "0.7.4", features = ["macros", "ws"] }
blake2 = "0.10.6"
rand = "0.9.0"
serde = { version = "1.0.219", features = ["derive", "rc"] }
tokio = "1.44.1"
offline-web-model = { path = "../offline-web-model" }
axum-typed-websockets = "0.6.0"

View File

@ -1,8 +1,15 @@
use std::{collections::HashMap, sync::Arc};
use axum::{extract::Path, http, response::{Html, IntoResponse}, routing::get, Json, Router};
use axum::{
http,
response::{Html, IntoResponse},
routing::get,
Router,
};
use axum_typed_websockets::{Message, WebSocket, WebSocketUpgrade};
use blake2::{Blake2b512, Digest};
use rand::Rng;
use serde::{Deserialize, Serialize};
use offline_web_model::Reference;
@ -29,7 +36,11 @@ fn random_object() -> AddressableObject {
}
}
fn random_references_and_objects() -> (Arc<Reference>, Arc<HashMap<String, Arc<Reference>>>, Arc<HashMap<String, AddressableObject>>) {
fn random_references_and_objects() -> (
Arc<Reference>,
Arc<HashMap<String, Arc<Reference>>>,
Arc<HashMap<String, AddressableObject>>,
) {
let path_root = String::from("ref/0");
let mut objects = HashMap::new();
let mut refs = HashMap::new();
@ -53,7 +64,8 @@ fn random_references_and_objects() -> (Arc<Reference>, Arc<HashMap<String, Arc<R
format!("item:{}:subitem:{}", i, j),
format!("{}", object.address),
format!("/item/{}/subitem/{}", i, j),
).to_arc();
)
.to_arc();
item_ref = item_ref.add_dep(leaf_ref.clone());
objects.insert(object.address.clone(), object);
hasher.update(&leaf_ref.content_address);
@ -73,27 +85,6 @@ fn random_references_and_objects() -> (Arc<Reference>, Arc<HashMap<String, Arc<R
(rc_root, Arc::new(refs), Arc::new(objects))
}
// TODO(jeremy): Allow this to autoexpand the content_addresses?
async fn all_references(root_ref: Arc<Reference>) -> Json<Arc<Reference>> {
Json(root_ref)
}
async fn ref_path(refs: Arc<HashMap<String, Arc<Reference>>>, Path(path): Path<String>) -> Json<Arc<Reference>> {
let path = format!("/item/{}", path);
match refs.get(&path) {
Some(r) => Json(r.clone()),
None => todo!("Return a 404?"),
}
}
async fn object_path(objects: Arc<HashMap<String, AddressableObject>>, Path(path): Path<String>) -> String {
dbg!(&path);
match objects.get(&path) {
Some(o) => o.content.clone(),
None => todo!("Return a 404?"),
}
}
async fn get_client_js() -> impl IntoResponse {
(
[(http::header::CONTENT_TYPE, "application/javascript")],
@ -101,40 +92,126 @@ async fn get_client_js() -> impl IntoResponse {
)
}
pub fn endpoints(root_ref: Arc<Reference>, refs: Arc<HashMap<String, Arc<Reference>>>, objects: Arc<HashMap<String, AddressableObject>>) -> Router {
// TODO(zaphar): use websockets instead
Router::new().nest(
#[derive(Debug, Serialize, Deserialize)]
enum ServerMsg {
Reference(Reference),
Object(String),
}
#[derive(Debug, Serialize, Deserialize)]
enum ClientMsg {
Bootstrap,
GetReference(String),
GetObject(String),
}
async fn handle_websocket(
ws: WebSocketUpgrade<ServerMsg, ClientMsg>,
root_ref: Arc<Reference>,
refs: Arc<HashMap<String, Arc<Reference>>>,
objects: Arc<HashMap<String, AddressableObject>>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, root_ref, refs, objects))
}
async fn handle_socket(
mut socket: WebSocket<ServerMsg, ClientMsg>,
root_ref: Arc<Reference>,
refs: Arc<HashMap<String, Arc<Reference>>>,
objects: Arc<HashMap<String, AddressableObject>>,
) {
// Send initial data to the client
// Handle incoming messages
// TODO(zaphar): This should actually be smarter in the case of None or Err.
while let Some(Ok(_msg)) = socket.recv().await {
match _msg {
Message::Item(ClientMsg::Bootstrap) => {
if let Err(e) = socket
.send(Message::Item(ServerMsg::Reference(
root_ref.as_ref().clone(),
)))
.await
{
eprintln!("Error sending bootstrap reference: {:?}", e);
continue;
}
}
Message::Item(ClientMsg::GetReference(path)) => {
if let Some(reference) = refs.get(&path) {
if let Err(e) = socket
.send(Message::Item(ServerMsg::Reference((**reference).clone())))
.await
{
eprintln!("Error sending reference: {:?}", e);
continue;
}
} else {
eprintln!("Reference not found: {}", path);
}
}
Message::Item(ClientMsg::GetObject(address)) => {
if let Some(object) = objects.get(&address) {
if let Err(e) = socket
.send(Message::Item(ServerMsg::Object(object.content.clone())))
.await
{
eprintln!("Error sending object: {:?}", e);
continue;
}
} else {
eprintln!("Object not found: {}", address);
}
}
Message::Ping(items) => {
eprintln!("unhandled ping msg: {:?}", items);
}
Message::Pong(items) => {
eprintln!("unhandled pong msg: {:?}", items);
}
Message::Close(_close_frame) => {
eprintln!("closing websocket connection at client request");
break;
}
}
}
println!("WebSocket connection closed");
}
pub fn endpoints(
root_ref: Arc<Reference>,
refs: Arc<HashMap<String, Arc<Reference>>>,
objects: Arc<HashMap<String, AddressableObject>>,
) -> Router {
// WebSocket route now implemented
Router::new()
.nest(
"/api/v1",
Router::new().nest(
"/ref",
Router::new()
.route("/all/username", get({
let state = root_ref.clone();
move || all_references(state)
}))
.route("/item/{*path}", get({
let refs = refs.clone();
move |path| ref_path(refs, path)
}))
).nest(
"/object",
Router::new()
.route("/{addr}", get({
let objects = objects.clone();
move |path| object_path(objects, path)
}))
Router::new().route(
"/ws",
get(|ws: WebSocketUpgrade<ServerMsg, ClientMsg>| async move {
handle_websocket(ws, root_ref.clone(), refs.clone(), objects.clone()).await
}),
),
)
.route("/lib/client.js", get(get_client_js))
.route("/ui/", get(|| async { Html(include_str!("../static/index.html")).into_response() }))
.route(
"/ui/",
get(|| async { Html(include_str!("../static/index.html")).into_response() }),
)
}
// TODO(jwall): Javascript test script
pub async fn serve() {
// run our app with hyper, listening globally on port 3000
let (root_ref, refs, objects) = random_references_and_objects();
println!("Server starting on http://127.0.0.1:3000");
println!("WebSocket endpoint available at ws://127.0.0.1:3000/api/v1/ws");
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
axum::serve(listener, endpoints(root_ref, refs, objects)).await.unwrap();
axum::serve(listener, endpoints(root_ref, refs, objects))
.await
.unwrap();
}

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Reference {
pub object_id: String,
pub content_address: String,