diff --git a/noita-proxy/Cargo.lock b/noita-proxy/Cargo.lock index 343452d8..969d74e8 100644 --- a/noita-proxy/Cargo.lock +++ b/noita-proxy/Cargo.lock @@ -1125,6 +1125,12 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" + [[package]] name = "malloc_buf" version = "0.0.6" @@ -1220,6 +1226,7 @@ version = "0.1.0" dependencies = [ "bitcode", "eframe", + "lz4_flex", "serde", "tangled", "tracing", diff --git a/noita-proxy/Cargo.toml b/noita-proxy/Cargo.toml index 3f2201ae..1e831454 100644 --- a/noita-proxy/Cargo.toml +++ b/noita-proxy/Cargo.toml @@ -16,6 +16,7 @@ tracing = "0.1.40" tangled = { path = "tangled" } serde = { version = "1.0.199", features = ["serde_derive"] } bitcode = "0.6.0" +lz4_flex = { version = "0.11.3", default_features = false, features = ["std"]} [profile.dev] opt-level = 1 diff --git a/noita-proxy/src/lib.rs b/noita-proxy/src/lib.rs index 8c4013e5..f0d2cb7b 100644 --- a/noita-proxy/src/lib.rs +++ b/noita-proxy/src/lib.rs @@ -1,7 +1,7 @@ use std::{ fmt::Display, io::{self, Write}, - net::{SocketAddr, TcpListener}, + net::{SocketAddr, TcpListener, TcpStream}, sync::{atomic::AtomicBool, Arc, Mutex}, thread, time::Duration, @@ -11,7 +11,7 @@ use bitcode::{Decode, Encode}; use eframe::egui::{self, Color32}; use tangled::{Peer, PeerId, Reliability}; use tracing::{error, info, warn}; -use tungstenite::accept; +use tungstenite::{accept, WebSocket}; use crate::messages::NetMsg; @@ -39,6 +39,21 @@ fn ws_encode_mod(peer: PeerId, data: &[u8]) -> tungstenite::Message { tungstenite::Message::Binary(buf) } +struct NetInnerState { + ws: Option>, +} + +impl NetInnerState { + fn try_ws_write(&mut self, data: tungstenite::Message) { + if let Some(ws) = &mut self.ws { + if let Err(err) = ws.write(data) { + error!("Error occured while sending to websocket: {}", err); + self.ws = None; + }; + } + } +} + struct NetManager { peer: tangled::Peer, settings: Mutex, @@ -80,16 +95,15 @@ impl NetManager { .set_nonblocking(true) .expect("can set nonblocking"); - let mut websocket = None; + let mut state = NetInnerState { ws: None }; while self .continue_running .load(std::sync::atomic::Ordering::Relaxed) { self.local_connected - .store(websocket.is_some(), std::sync::atomic::Ordering::Relaxed); - if websocket.is_none() - && self.accept_local.load(std::sync::atomic::Ordering::SeqCst) + .store(state.ws.is_some(), std::sync::atomic::Ordering::Relaxed); + if state.ws.is_none() && self.accept_local.load(std::sync::atomic::Ordering::SeqCst) { thread::sleep(Duration::from_millis(10)); if let Ok((stream, addr)) = local_server.accept() { @@ -99,28 +113,26 @@ impl NetManager { .set_read_timeout(Some(Duration::from_millis(1))) .expect("can set read timeout"); - websocket = accept(stream).ok(); - if websocket.is_some() { + state.ws = accept(stream).ok(); + if state.ws.is_some() { info!("New stream connected"); - if let Some(ws) = &mut websocket { - let settings = self.settings.lock().unwrap(); - ws.write(ws_encode_proxy("seed", settings.seed)).ok(); - ws.write(ws_encode_proxy( - "peer_id", - self.peer.my_id().expect("Has peer id at this point"), - )) - .ok(); - ws.write(ws_encode_proxy("name", "test_name")).ok(); - ws.write(ws_encode_proxy("ready", "")).ok(); - // TODO? - for id in self.peer.iter_peer_ids() { - ws.write(ws_encode_proxy("join", id)).ok(); - } + + let settings = self.settings.lock().unwrap(); + state.try_ws_write(ws_encode_proxy("seed", settings.seed)); + state.try_ws_write(ws_encode_proxy( + "peer_id", + self.peer.my_id().expect("Has peer id at this point"), + )); + state.try_ws_write(ws_encode_proxy("name", "test_name")); + state.try_ws_write(ws_encode_proxy("ready", "")); + // TODO? + for id in self.peer.iter_peer_ids() { + state.try_ws_write(ws_encode_proxy("join", id)); } } } } - if let Some(ws) = &mut websocket { + if let Some(ws) = &mut state.ws { ws.flush().ok(); } for net_event in self.peer.recv() { @@ -137,14 +149,10 @@ impl NetManager { tangled::Reliability::Reliable, ); } - if let Some(ws) = &mut websocket { - ws.write(ws_encode_proxy("join", id)).ok(); - } + state.try_ws_write(ws_encode_proxy("join", id)); } tangled::NetworkEvent::PeerDisconnected(id) => { - if let Some(ws) = &mut websocket { - ws.write(ws_encode_proxy("leave", id)).ok(); - } + state.try_ws_write(ws_encode_proxy("leave", id)); } tangled::NetworkEvent::Message(msg) => { let Ok(net_msg) = bitcode::decode::(&msg.data) else { @@ -158,33 +166,49 @@ impl NetManager { .store(true, std::sync::atomic::Ordering::SeqCst); } NetMsg::ModRaw { data } => { - if let Some(ws) = &mut websocket { - if let Err(err) = ws.write(ws_encode_mod(msg.src, &data)) { - error!( - "Error occured while sending to websocket: {}", - err - ); - websocket = None; - }; + state.try_ws_write(ws_encode_mod(msg.src, &data)); + } + NetMsg::ModCompressed { data } => { + if let Ok(decompressed) = + lz4_flex::decompress_size_prepended(&data) + { + state.try_ws_write(ws_encode_mod(msg.src, &decompressed)); } } } } } } - if let Some(ws) = &mut websocket { + if let Some(ws) = &mut state.ws { let msg = ws.read(); match msg { Ok(msg) => match msg { - tungstenite::Message::Binary(msg) => self - .broadcast(&NetMsg::ModRaw { data: msg }, Reliability::Unreliable), + tungstenite::Message::Binary(msg) => { + if msg.len() > 100 { + let compressed = lz4_flex::compress_prepend_size(&msg); + info!( + "Compressed {} bytes to {} bytes", + msg.len(), + compressed.len() + ); + self.broadcast( + &NetMsg::ModCompressed { data: compressed }, + Reliability::Unreliable, + ); + } else { + self.broadcast( + &NetMsg::ModRaw { data: msg }, + Reliability::Unreliable, + ) + } + } _ => {} }, Err(tungstenite::Error::Io(io_err)) if io_err.kind() == io::ErrorKind::WouldBlock => {} Err(err) => { error!("Error occured while reading from websocket: {}", err); - websocket = None; + state.ws = None; } } } diff --git a/noita-proxy/src/messages.rs b/noita-proxy/src/messages.rs index 15fc706b..cffbd572 100644 --- a/noita-proxy/src/messages.rs +++ b/noita-proxy/src/messages.rs @@ -6,4 +6,5 @@ use crate::GameSettings; pub enum NetMsg { StartGame { settings: GameSettings }, ModRaw { data: Vec }, + ModCompressed { data: Vec }, } diff --git a/todo.txt b/todo.txt new file mode 100644 index 00000000..c24c9d8e --- /dev/null +++ b/todo.txt @@ -0,0 +1,9 @@ + - Синхронизация перков + - Лимит на длину сообщения + - Улучшеная синхронизация инвентаря и текущего предмета + - reliability + + Сжатие пакетов + - Общее хп + - Синхронизация противников + - Перекидывание предметов + - Синхронизация мира \ No newline at end of file