diff --git a/noita-proxy/Cargo.lock b/noita-proxy/Cargo.lock index e049adda..38216dd7 100644 --- a/noita-proxy/Cargo.lock +++ b/noita-proxy/Cargo.lock @@ -1808,6 +1808,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.7.2" @@ -2399,8 +2408,17 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.6", + "regex-syntax 0.8.3", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2411,9 +2429,15 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.3", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.3" @@ -2689,7 +2713,9 @@ dependencies = [ "crossbeam", "dashmap", "serde", + "test-log", "tracing", + "tracing-subscriber", ] [[package]] @@ -2713,6 +2739,27 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "test-log" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" +dependencies = [ + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "thiserror" version = "1.0.59" @@ -2860,10 +2907,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/noita-proxy/Cargo.toml b/noita-proxy/Cargo.toml index a675887f..04d66854 100644 --- a/noita-proxy/Cargo.toml +++ b/noita-proxy/Cargo.toml @@ -1,3 +1,6 @@ +[workspace] +members = ["tangled"] + [package] name = "noita-proxy" version = "0.1.0" diff --git a/noita-proxy/tangled b/noita-proxy/tangled deleted file mode 160000 index 1aada41e..00000000 --- a/noita-proxy/tangled +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1aada41e01c7326c93a719d2a75daabd38aeb93c diff --git a/noita-proxy/tangled/.gitignore b/noita-proxy/tangled/.gitignore new file mode 100644 index 00000000..ea8c4bf7 --- /dev/null +++ b/noita-proxy/tangled/.gitignore @@ -0,0 +1 @@ +/target diff --git a/noita-proxy/tangled/Cargo.toml b/noita-proxy/tangled/Cargo.toml new file mode 100644 index 00000000..9983abb0 --- /dev/null +++ b/noita-proxy/tangled/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "tangled" +version = "0.2.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/IntQuant/tangled" +categories = ["network-programming", ] +description = "Work-in-progress UDP networking crate." + + +[[example]] +name = "chat" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +crossbeam = "0.8.2" +tracing = "0.1.36" +dashmap = "5.3.4" +serde = {features = ["derive"], version = "1.0.142"} +bincode = "1.3.3" + +[dev-dependencies] +test-log = { version = "0.2.11", default-features = false, features = ["trace"]} +tracing-subscriber = {version = "0.3", features = ["env-filter", "fmt"]} diff --git a/noita-proxy/tangled/LICENSE-APACHE b/noita-proxy/tangled/LICENSE-APACHE new file mode 100644 index 00000000..2bb9ad24 --- /dev/null +++ b/noita-proxy/tangled/LICENSE-APACHE @@ -0,0 +1,176 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/noita-proxy/tangled/LICENSE-MIT b/noita-proxy/tangled/LICENSE-MIT new file mode 100644 index 00000000..68e440c6 --- /dev/null +++ b/noita-proxy/tangled/LICENSE-MIT @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 "IntQuant" + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/noita-proxy/tangled/README.md b/noita-proxy/tangled/README.md new file mode 100644 index 00000000..ab6e688c --- /dev/null +++ b/noita-proxy/tangled/README.md @@ -0,0 +1 @@ +Work-in-progress UDP networking crate. \ No newline at end of file diff --git a/noita-proxy/tangled/examples/chat.rs b/noita-proxy/tangled/examples/chat.rs new file mode 100644 index 00000000..abaf5f4a --- /dev/null +++ b/noita-proxy/tangled/examples/chat.rs @@ -0,0 +1,76 @@ +use std::{ + env::args, io::stdin, thread::{sleep, spawn}, time::Duration +}; + +use crossbeam::channel::bounded; +use tangled::{Peer, Reliability}; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; + +fn main() { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::DEBUG) + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + let mut args = args().skip(1); + let peer = match args.next().as_ref().map(|s| s.as_str()) { + Some("host") => { + let bind_addr = match args.next().map_or(None, |arg| arg.parse().ok()) { + Some(addr) => addr, + None => { + println!("Expected an address:port to host on as a second argument"); + return; + } + }; + Peer::host(bind_addr, None) + } + Some("connect") => { + let connect_addr = match args.next().map_or(None, |arg| arg.parse().ok()) { + Some(addr) => addr, + None => { + println!("Expected an address:port to connect to as a second argument"); + return; + } + }; + Peer::connect(connect_addr, None) + } + Some(_) | None => { + println!("First argument should be one of 'host', 'connect'"); + return; + } + } + .unwrap(); + let (s, r) = bounded(1); + spawn(move || { + for msg in stdin().lines() { + s.send(msg.unwrap()).unwrap(); + } + }); + loop { + for msg in peer.recv() { + match msg { + tangled::NetworkEvent::PeerConnected(id) => println!("Peer connected: {}", id), + tangled::NetworkEvent::PeerDisconnected(id) => { + println!("Peer disconnected: {}", id) + } + tangled::NetworkEvent::Message(msg) => { + println!("{}", String::from_utf8_lossy(&msg.data)) + } + } + } + for msg in r.try_iter() { + println!("State: {:?}", peer.state()); + let data = msg.as_bytes(); + for destination in peer.iter_peer_ids() { + if destination == peer.my_id().unwrap() { + continue; + } + println!("Sent to {}", destination); + peer.send(destination, data.to_vec(), Reliability::Reliable) + .unwrap(); + } + } + sleep(Duration::from_millis(10)); + } +} diff --git a/noita-proxy/tangled/src/error.rs b/noita-proxy/tangled/src/error.rs new file mode 100644 index 00000000..d3d52e0f --- /dev/null +++ b/noita-proxy/tangled/src/error.rs @@ -0,0 +1,39 @@ +use std::{error::Error, fmt::Display}; + +use crossbeam::channel::SendError; + +use crate::MAX_MESSAGE_LEN; + +/// Describes possible errors +#[derive(Debug)] +pub enum NetError { + /// Tried to use an invalid peer id. + UnknownPeer, + /// Peer is not able to communicate with other peers anymore. + Disconnected, + /// Tried to send a message longer than `MAX_MESSAGE_LEN`. + MessageTooLong, + /// Unreliable message was instantly dropped because there are too many packets waiting to be sent. + Dropped, +} + +impl Display for NetError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NetError::UnknownPeer => write!(f, "No peer with this id"), + NetError::Disconnected => write!(f, "Not connected"), + NetError::MessageTooLong => { + write!(f, "Message len exceeds the limit of {}", MAX_MESSAGE_LEN) + } + NetError::Dropped => write!(f, "Message dropped"), + } + } +} + +impl Error for NetError {} + +impl From> for NetError { + fn from(_: SendError) -> Self { + Self::Disconnected + } +} diff --git a/noita-proxy/tangled/src/lib.rs b/noita-proxy/tangled/src/lib.rs new file mode 100644 index 00000000..de297aa8 --- /dev/null +++ b/noita-proxy/tangled/src/lib.rs @@ -0,0 +1,307 @@ +//! Tangled - a work-in-progress UDP networking crate. + +use std::{ + fmt::Display, + io, + net::{SocketAddr, UdpSocket}, + sync::{atomic::AtomicBool, Arc}, +}; + +use crossbeam::{ + self, + atomic::AtomicCell, + channel::{unbounded, Receiver, Sender}, +}; + +pub use error::NetError; +use reactor::{Destination, RemotePeer, Shared}; +pub use reactor::{Reliability, Settings}; +use serde::{Deserialize, Serialize}; + +const DATAGRAM_MAX_LEN: usize = 1500; + +/// Maximum size of a message which fits into a single datagram. +pub const MAX_MESSAGE_LEN: usize = 1200; + +mod error; +mod reactor; +mod util; + +struct Datagram { + pub size: usize, + pub data: [u8; DATAGRAM_MAX_LEN], +} + +/// A value which refers to a specific peer. +/// Peer 0 is always the host. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] +pub struct PeerId(pub u16); + +impl Display for PeerId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +type SeqId = u16; + +/// Possible network events, returned by `Peer.recv()`. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum NetworkEvent { + /// A new peer has connected. + PeerConnected(PeerId), + /// Peer has disconnected. + PeerDisconnected(PeerId), + /// Message has been received. + Message(Message), +} + +/// A message received from a peer. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Message { + src: PeerId, + /// The data that has been sent. + pub data: Vec, +} + +struct OutboundMessage { + pub dst: Destination, + pub data: Vec, + pub reliability: Reliability, +} + +/// Current peer state +#[derive(Default, Clone, Copy, Debug)] +pub enum PeerState { + /// Waiting for connection. Switches to `Connected` right after id from the host has been acquired. + /// Note: hosts switches to 'Connected' basically instantly. + #[default] + PendingConnection, + /// Connected to host and ready to send/receive messages. + Connected, + /// No longer connected, won't reconnect. + Disconnected, +} + +type Channel = (Sender, Receiver); + +/// Represents a network endpoint. Can be constructed in either `host` or `client` mode. +/// Client can only connect to hosts, but they are able to send messages to any other peer connected to the same host, including the host itself. +#[derive(Clone)] +pub struct Peer { + shared: Arc, +} + +impl Peer { + fn new( + bind_addr: SocketAddr, + host_addr: Option, + settings: Option, + ) -> io::Result { + let socket = UdpSocket::bind(bind_addr)?; + let shared = Arc::new(Shared { + socket, + inbound_channel: unbounded(), + outbound_channel: unbounded(), + keep_alive: AtomicBool::new(true), + host_addr, + peer_state: Default::default(), + remote_peers: Default::default(), + max_packets_per_second: 256, + my_id: AtomicCell::new(if host_addr.is_none() { + Some(PeerId(0)) + } else { + None + }), + settings: settings.unwrap_or_default(), + }); + if host_addr.is_none() { + shared.remote_peers.insert(PeerId(0), RemotePeer::default()); + } + reactor::Reactor::start(Arc::clone(&shared)); + Ok(Peer { shared }) + } + + /// Host at a specified `bind_addr`. + pub fn host(bind_addr: SocketAddr, settings: Option) -> io::Result { + Self::new(bind_addr, None, settings) + } + + /// Connect to a specified `host_addr`. + pub fn connect(host_addr: SocketAddr, settings: Option) -> io::Result { + Self::new("0.0.0.0:0".parse().unwrap(), Some(host_addr), settings) + } + + /// Send a message to a specified single peer. + pub fn send( + &self, + destination: PeerId, + data: Vec, + reliability: Reliability, + ) -> Result<(), NetError> { + self.send_internal(Destination::One(destination), data, reliability) + } + + pub fn broadcast(&self, data: Vec, reliability: Reliability) -> Result<(), NetError> { + self.send_internal(Destination::Broadcast, data, reliability) + } + + fn send_internal( + &self, + destination: Destination, + data: Vec, + reliability: Reliability, + ) -> Result<(), NetError> { + if data.len() > MAX_MESSAGE_LEN { + return Err(NetError::MessageTooLong); + } + if reliability == Reliability::Unreliable + && self.shared.outbound_channel.0.len() * 2 > self.shared.max_packets_per_second + { + return Err(NetError::Dropped); + } + self.shared.outbound_channel.0.send(OutboundMessage { + dst: destination, + data, + reliability, + })?; + Ok(()) + } + + /// Return an iterator over recieved messages. + /// Does not block. + pub fn recv(&self) -> impl Iterator + '_ { + self.shared.inbound_channel.1.try_iter() + } + + /// Return an iterator over recieved messages. + /// Blocking. + pub fn recv_blocking(&self) -> impl Iterator + '_ { + self.shared.inbound_channel.1.iter() + } + + /// Returns own `PeerId`, whcih can be used by any remote peer to send a message to this one. + /// None is returned when not connected yet. + pub fn my_id(&self) -> Option { + self.shared.my_id.load() + } + + /// Current state of the peer. + pub fn state(&self) -> PeerState { + self.shared.peer_state.load() + } + + /// Iterate over connected peers, returning ther `PeerId`. + pub fn iter_peer_ids(&self) -> impl Iterator + '_ { + self.shared + .remote_peers + .iter() + .map(|item| item.key().to_owned()) + } +} + +impl Drop for Peer { + fn drop(&mut self) { + self.shared + .keep_alive + .store(false, std::sync::atomic::Ordering::SeqCst) + } +} + +#[cfg(test)] +mod test { + use std::{thread, time::Duration}; + + use crate::{reactor::Settings, Message, NetworkEvent, Peer, PeerId, Reliability}; + + #[test_log::test] + fn test_peer() { + let settings = Some(Settings { + confirm_max_period: Duration::from_millis(100), + connection_timeout: Duration::from_millis(1000), + ..Default::default() + }); + let addr = "127.0.0.1:56001".parse().unwrap(); + let host = Peer::host(addr, settings.clone()).unwrap(); + assert_eq!(host.shared.remote_peers.len(), 1); + let peer = Peer::connect(addr, settings.clone()).unwrap(); + thread::sleep(Duration::from_millis(100)); + assert_eq!(peer.shared.remote_peers.len(), 2); + assert_eq!(host.shared.remote_peers.len(), 2); + let data = vec![128, 51, 32]; + peer.send(PeerId(0), data.clone(), Reliability::Reliable) + .unwrap(); + thread::sleep(Duration::from_millis(10)); + let host_events: Vec<_> = host.recv().collect(); + assert!(host_events.contains(&NetworkEvent::PeerConnected(PeerId(1)))); + assert!(host_events.contains(&NetworkEvent::Message(Message { + data, + src: PeerId(1) + }))); + let peer_events: Vec<_> = peer.recv().collect(); + assert!(peer_events.contains(&NetworkEvent::PeerConnected(PeerId(0)))); + assert!(peer_events.contains(&NetworkEvent::PeerConnected(PeerId(1)))); + drop(peer); + thread::sleep(Duration::from_millis(1200)); + assert_eq!( + host.recv().next(), + Some(NetworkEvent::PeerDisconnected(PeerId(1))) + ); + assert_eq!(host.shared.remote_peers.len(), 1); + } + + #[test_log::test] + fn test_broadcast() { + let settings = Some(Settings { + confirm_max_period: Duration::from_millis(100), + connection_timeout: Duration::from_millis(1000), + ..Default::default() + }); + let addr = "127.0.0.1:56002".parse().unwrap(); + let host = Peer::host(addr, settings.clone()).unwrap(); + assert_eq!(host.shared.remote_peers.len(), 1); + let peer1 = Peer::connect(addr, settings.clone()).unwrap(); + let peer2 = Peer::connect(addr, settings.clone()).unwrap(); + thread::sleep(Duration::from_millis(10)); + assert_eq!(host.shared.remote_peers.len(), 3); + + let data = vec![123, 112, 51, 23]; + peer1 + .broadcast(data.clone(), Reliability::Reliable) + .unwrap(); + thread::sleep(Duration::from_millis(10)); + + let host_events: Vec<_> = dbg!(host.recv().collect()); + let peer1_events: Vec<_> = dbg!(peer1.recv().collect()); + let peer2_events: Vec<_> = dbg!(peer2.recv().collect()); + + assert!(peer2_events.contains(&NetworkEvent::Message(Message { + src: peer1.my_id().unwrap(), + data: data.clone(), + }))); + assert!(!peer1_events.contains(&NetworkEvent::Message(Message { + src: peer1.my_id().unwrap(), + data: data.clone(), + }))); + assert!(host_events.contains(&NetworkEvent::Message(Message { + src: peer1.my_id().unwrap(), + data: data.clone(), + }))); + } + + #[test_log::test] + fn test_host_has_conn() { + let settings = Some(Settings { + confirm_max_period: Duration::from_millis(100), + connection_timeout: Duration::from_millis(1000), + ..Default::default() + }); + let addr = "127.0.0.1:56002".parse().unwrap(); + let host = Peer::host(addr, settings.clone()).unwrap(); + thread::sleep(Duration::from_millis(10)); + assert_eq!( + host.recv().next(), + Some(NetworkEvent::PeerConnected(PeerId(0))) + ); + } +} diff --git a/noita-proxy/tangled/src/reactor.rs b/noita-proxy/tangled/src/reactor.rs new file mode 100644 index 00000000..a6c1e607 --- /dev/null +++ b/noita-proxy/tangled/src/reactor.rs @@ -0,0 +1,679 @@ +use crate::{ + error::NetError, + util::{RateLimiter, RingSet}, + Channel, Message, NetworkEvent, OutboundMessage, PeerId, SeqId, +}; + +use super::{Datagram, PeerState, DATAGRAM_MAX_LEN}; +use crossbeam::{ + atomic::AtomicCell, + channel::{bounded, Receiver, Sender}, + select, +}; + +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, VecDeque}, + error::Error, + io::Cursor, + net::{SocketAddr, UdpSocket}, + sync::{ + atomic::{AtomicBool, AtomicU16, Ordering::SeqCst}, + Arc, + }, + thread, + time::{Duration, Instant}, +}; +use tracing::{error, info, trace, warn}; + +/// Per-peer settings. Peers that are connected to the same host, as well as the host itself, should have the same settings. +#[derive(Debug, Clone)] +pub struct Settings { + /// A single datagram will confirm at most this much messages. Default is 128. + pub confirm_max_per_message: usize, + /// How much time can elapse before another confirm is sent. + /// Confirms are also sent when enough messages are awaiting confirm. + /// Note that confirms also double as "heartbeats" and keep the connection alive, so this value should be much less than `connection_timeout`. + /// Default: 1 second. + pub confirm_max_period: Duration, + /// Peers will be disconnected after this much time without any datagrams from them has passed. + /// Default: 1 second. + pub connection_timeout: Duration, +} + +impl Default for Settings { + fn default() -> Self { + Self { + confirm_max_per_message: 128, + confirm_max_period: Duration::from_secs(1), + connection_timeout: Duration::from_secs(10), + } + } +} + +pub(crate) struct Shared { + pub settings: Settings, + pub socket: UdpSocket, + pub inbound_channel: Channel, + pub outbound_channel: Channel, + pub keep_alive: AtomicBool, + pub peer_state: AtomicCell, + pub remote_peers: DashMap, + pub max_packets_per_second: usize, + pub host_addr: Option, + pub my_id: AtomicCell>, +} + +struct DirectPeer { + addr: SocketAddr, + outbound_pending: VecDeque, + resend_pending: VecDeque<(Instant, NetMessageNormal)>, + confirmed: RingSet, + rate_limit: RateLimiter, + seq_counter: AtomicU16, + recent_seq: RingSet, + pending_confirms: VecDeque, + last_confirm_sent: Instant, + last_seen: Instant, +} + +#[derive(Default)] +pub struct RemotePeer {} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +pub enum Destination { + One(PeerId), + Broadcast, +} + +#[derive(Serialize, Deserialize, Clone)] +enum NetMessageVariant { + Login, + Normal(NetMessageNormal), +} + +/// Tells how reliable a message is. +#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Debug)] +pub enum Reliability { + /// Message will be delivered at most once. + Unreliable, + /// Message will be resent untill is's arrival will be confirmed. + /// Will be delivered at most once. + Reliable, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +struct NetMessageNormal { + // Source that generated sequence id. + // Initially the same as origin_src, but can be changed when packet is retransmitted not as-is, e. g. when it is broadcasted. + src: PeerId, + // Original source. + origin_src: PeerId, + dst: Destination, + seq_id: SeqId, + reliability: Reliability, + inner: NetMessageInner, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +enum NetMessageInner { + RegDone { addr: SocketAddr }, + AddPeer { id: PeerId }, + DelPeer { id: PeerId }, + Confirm { confirmed_ids: Vec }, + Payload { data: Vec }, +} + +impl TryFrom for NetMessageVariant { + type Error = bincode::Error; + + fn try_from(datagram: Datagram) -> Result { + bincode::deserialize(&datagram.data[..datagram.size]) + } +} + +impl TryFrom<&NetMessageVariant> for Datagram { + type Error = bincode::Error; + + fn try_from(value: &NetMessageVariant) -> Result { + let mut data = Cursor::new([0; DATAGRAM_MAX_LEN]); + bincode::serialize_into(&mut data, value)?; + let data = data.into_inner(); + Ok(Datagram { + data, + size: data.len(), + }) + } +} + +pub(crate) struct Reactor { + shared: Arc, + direct_peers: HashMap, +} + +type AddrDatagram = (SocketAddr, Datagram); + +impl Reactor { + fn add_peer(&self, id: PeerId) -> Result<(), NetError> { + self.shared.remote_peers.insert(id, RemotePeer::default()); + self.shared + .inbound_channel + .0 + .send(NetworkEvent::PeerConnected(id))?; + Ok(()) + } + + fn direct_broadcast( + &mut self, + src_id: PeerId, + msg: NetMessageInner, + reliability: Reliability, + ) -> Result<(), NetError> { + for (&peer_id, peer) in self.direct_peers.iter_mut() { + let new_seq_id = peer.seq_counter.fetch_add(1, SeqCst); + let new_msg = Self::wrap_packet_seq_id( + src_id, + src_id, + new_seq_id, + Destination::One(peer_id), + msg.clone(), + reliability, + )?; + Self::direct_send_peer(peer, new_msg)?; + } + Ok(()) + } + + fn direct_send(&mut self, id: PeerId, msg: NetMessageVariant) -> Result<(), NetError> { + let peer = self + .direct_peers + .get_mut(&id) + .ok_or(NetError::UnknownPeer)?; + Self::direct_send_peer(peer, msg) + } + + fn direct_send_peer(peer: &mut DirectPeer, msg: NetMessageVariant) -> Result<(), NetError> { + peer.outbound_pending.push_back(msg); + Ok(()) + } + + fn gen_peer_id(&mut self) -> Option { + (1..=u16::MAX) + .map(PeerId) + .find(|i| !self.shared.remote_peers.contains_key(i)) + } + + fn handle_inbound(&mut self, (incoming_addr, msg_raw): AddrDatagram) { + let msg = match NetMessageVariant::try_from(msg_raw) { + Ok(msg) => msg, + Err(err) => { + warn!("Error when converting to NetMessage: {}", err); + return; + } + }; + match self.shared.my_id.load() { + Some(id) => { + match msg { + NetMessageVariant::Login => { + if self.is_host() { + //TODO check this addr is not already registered + match self.gen_peer_id() { + Some(new_id) => { + self.add_peer(new_id).ok(); + let mut peer = DirectPeer::new( + incoming_addr, + self.shared.max_packets_per_second, + ); + peer.outbound_pending.push_back(NetMessageVariant::Normal( + NetMessageNormal { + src: id, + origin_src: id, + dst: Destination::One(new_id), + seq_id: u16::MAX, + inner: NetMessageInner::RegDone { + addr: incoming_addr, + }, + reliability: Reliability::Reliable, + }, + )); + self.direct_peers.insert(new_id, peer); + self.direct_broadcast( + id, + NetMessageInner::AddPeer { id: new_id }, + Reliability::Reliable, + ) + .ok(); + let shared = self.shared.clone(); + for re in shared.remote_peers.iter() { + let id = *re.key(); + if id != new_id { + self.wrap_packet( + id, + Destination::One(new_id), + NetMessageInner::AddPeer { id }, + Reliability::Reliable, + ) + .and_then(|msg| self.direct_send(new_id, msg)) + .ok(); + } + } + } + None => warn!("Out of ids"), + } + } else { + warn!("Not a host, registration attempt ignored"); + } + } + NetMessageVariant::Normal(msg) => { + match self.handle_inbound_normal(msg, incoming_addr, id) { + Ok(_) => {} + Err(NetError::Dropped) => {} + Err(err) => { + info!("Error while handling normal inbound message: {}", err) + } + } + } + } + } + None => match msg { + NetMessageVariant::Normal(NetMessageNormal { + inner: NetMessageInner::RegDone { addr: _ }, + dst, + src, + .. + }) => { + let expected_host_addr = self + .shared + .host_addr + .expect("Can't have both my_id and host_addr be None"); + if incoming_addr == expected_host_addr && src == PeerId(0) { + if let Destination::One(id) = dst { + self.shared.my_id.store(Some(id)); + self.add_peer(PeerId(0)).ok(); + self.shared.peer_state.store(PeerState::Connected); + } else { + warn!("Malformed registration message"); + } + } else { + warn!("Registration message recieved not from the right address ({}, {} expected)", incoming_addr, expected_host_addr); + } + } + _ => warn!("Message ignored as registration is not done yet"), + }, + } + } + + fn handle_inbound_normal( + &mut self, + msg: NetMessageNormal, + _incoming_addr: SocketAddr, + my_id: PeerId, + ) -> Result<(), NetError> { + let peer = self.direct_peers.get_mut(&msg.src); + if peer + .as_ref() + .map_or(true, |peer| peer.recent_seq.contains(&msg.seq_id)) + { + return Err(NetError::Dropped); + } + { + let peer = peer.expect("Expected to exist"); + peer.recent_seq.add(msg.seq_id); //TODO backpressure + peer.pending_confirms.push_back(msg.seq_id); + peer.last_seen = Instant::now() + } + + if Destination::One(my_id) == msg.dst || msg.dst == Destination::Broadcast { + // TODO eliminate this clone + match msg.inner.clone() { + NetMessageInner::RegDone { addr: _ } => { + warn!("Already registered, request ignored"); + } + NetMessageInner::AddPeer { id } => { + if !self.is_host() { + self.add_peer(id).ok(); + info!("Peer {} added", id); + } + } + NetMessageInner::DelPeer { id } => { + if !self.is_host() { + self.del_peer(id).ok(); + info!("Peer {} removed", id); + } + } + NetMessageInner::Confirm { confirmed_ids } => { + if let Some(peer) = self.direct_peers.get_mut(&msg.src) { + for id in confirmed_ids { + peer.confirmed.add(id); + } + } + } + NetMessageInner::Payload { data } => { + self.shared + .inbound_channel + .0 + .send(NetworkEvent::Message(Message { + src: msg.origin_src, + data, + }))?; + } + } + } + if self.is_host() && Destination::One(my_id) != msg.dst { + match msg.dst { + Destination::One(dst) => { + let new_msg = + self.wrap_packet(dst, Destination::One(dst), msg.inner, msg.reliability)?; + self.direct_send(dst, new_msg)?; + } + Destination::Broadcast => { + let mut buf = Vec::new(); + for peer in &self.direct_peers { + if *peer.0 == msg.src { + continue; + } + let seq_id = self.next_seq_id_for_peer(*peer.0)?; + if let Ok(wrapped_msg) = Self::wrap_packet_seq_id( + PeerId(0), + msg.origin_src, + seq_id, + Destination::One(*peer.0), + msg.inner.clone(), + msg.reliability, + ) { + buf.push((*peer.0, wrapped_msg)); + } + } + for (peer_id, wrapped_msg) in buf { + self.direct_send(peer_id, wrapped_msg).ok(); + } + } + } + } + + Ok(()) + } + + fn del_peer(&mut self, id: PeerId) -> Result<(), NetError> { + self.shared.remote_peers.remove(&id); + self.shared + .inbound_channel + .0 + .send(NetworkEvent::PeerDisconnected(id))?; + Ok(()) + } + + fn handle_outbound(&mut self, msg: OutboundMessage) -> Result<(), NetError> { + let dst = msg.dst; + if self.is_host() { + match dst { + Destination::One(id) => { + let net_msg = self.wrap_packet( + id, + dst, + NetMessageInner::Payload { data: msg.data }, + msg.reliability, + )?; + self.direct_send(id, net_msg)?; + } + Destination::Broadcast => self.direct_broadcast( + PeerId(0), + NetMessageInner::Payload { data: msg.data }, + msg.reliability, + )?, + } + } else { + let net_msg = self.wrap_packet( + PeerId(0), + dst, + NetMessageInner::Payload { data: msg.data }, + msg.reliability, + )?; + self.direct_send(PeerId(0), net_msg)?; + } + Ok(()) + } + + pub fn is_host(&self) -> bool { + self.shared.host_addr.is_none() + } + + pub fn next_seq_id_for_peer(&self, peer_id: PeerId) -> Result { + Ok(self + .direct_peers + .get(&peer_id) + .or_else(|| { + if !self.is_host() { + self.direct_peers.get(&PeerId(0)) + } else { + None + } + }) + .ok_or(NetError::UnknownPeer)? + .seq_counter + .fetch_add(1, SeqCst)) + } + + fn run(mut self, inbound_r: Receiver) -> Result<(), Box> { + while self.shared.keep_alive.load(SeqCst) { + select! { + recv(inbound_r) -> addr_msg => self.handle_inbound(addr_msg?), + recv(self.shared.outbound_channel.1) -> msg => {self.handle_outbound(msg?).ok();} + default => {thread::sleep(Duration::from_micros(100));} + } + let mut dc = Vec::new(); + self.direct_peers.retain(|&k, v| { + let stays = v.last_seen.elapsed() < self.shared.settings.connection_timeout; + if !stays { + dc.push(k); + } + stays + }); + if self.is_host() { + for peer_id in dc { + let src_id = self.shared.my_id.load().unwrap(); // Should always be PeerId(0) + assert_eq!(src_id, PeerId(0)); + self.direct_broadcast( + src_id, + NetMessageInner::DelPeer { id: peer_id }, + Reliability::Reliable, + )?; + self.del_peer(peer_id).ok(); + info!("[Host] Peer {} removed", peer_id); + } + } + if !self.is_host() && self.direct_peers.is_empty() { + self.shared.peer_state.store(PeerState::Disconnected); + self.shared.keep_alive.store(false, SeqCst); + } + 'peers: for (&id, peer) in self.direct_peers.iter_mut() { + let resend_in = Instant::now() + Duration::from_secs(1); + + if let Some(my_id) = self.shared.my_id.load() { + if peer.last_confirm_sent.elapsed() > self.shared.settings.confirm_max_period + || peer.pending_confirms.len() + > self.shared.settings.confirm_max_per_message + { + peer.last_confirm_sent = Instant::now(); + let max_per_message = self.shared.settings.confirm_max_per_message; + let mut confirmed_ids = Vec::with_capacity(max_per_message); + while let Some(confirm) = peer.pending_confirms.pop_front() { + confirmed_ids.push(confirm); + if confirmed_ids.len() == max_per_message { + break; + } + } + peer.resend_pending.push_front(( + Instant::now(), + NetMessageNormal { + src: my_id, + origin_src: my_id, + dst: Destination::One(id), + seq_id: peer.seq_counter.fetch_add(1, SeqCst), + reliability: Reliability::Reliable, + inner: NetMessageInner::Confirm { confirmed_ids }, + }, + )) + } + } + + while peer + .resend_pending + .front() + .map_or(false, |x| x.0 < Instant::now()) + { + let (moment, msg) = peer + .resend_pending + .pop_front() + .expect("Checked that deque is not empty"); + + if !peer.confirmed.contains(&msg.seq_id) { + if !peer.rate_limit.get_token() { + peer.resend_pending.push_front((moment, msg)); + continue 'peers; + } + peer.resend_pending.push_back((resend_in, msg.clone())); + trace!("Sent {:?} to {}", msg, peer.addr); + let datagram = Datagram::try_from(&NetMessageVariant::Normal(msg)).unwrap(); + self.shared + .socket + .send_to(&datagram.data[..datagram.size], peer.addr) + .expect("Could not send"); + } + } + + while !peer.outbound_pending.is_empty() && peer.rate_limit.get_token() { + let msg = peer + .outbound_pending + .pop_front() + .expect("Checked that deque is not empty"); + if let NetMessageVariant::Normal(ref msg) = msg { + if msg.reliability == Reliability::Reliable { + peer.resend_pending.push_back((resend_in, msg.clone())); + } + } + let datagram = Datagram::try_from(&msg).unwrap(); + self.shared + .socket + .send_to(&datagram.data[..datagram.size], peer.addr) + .expect("Could not send"); + } + } + } + Ok(()) + } + + fn run_pipe( + shared: Arc, + sender: Sender<(SocketAddr, Datagram)>, + ) -> Result<(), Box> { + while shared.keep_alive.load(SeqCst) { + let mut buf = [0u8; DATAGRAM_MAX_LEN]; + match shared.socket.recv_from(&mut buf) { + Ok((len, addr)) => sender + .send(( + addr, + Datagram { + size: len, + data: buf, + }, + )) + .map_err(Box::new)?, + //Err(err) + // if err.kind() == ErrorKind::WouldBlock || err.kind() == ErrorKind::TimedOut => { + //} + Err(err) => return Err(Box::new(err)), + } + } + Ok(()) + } + + pub(crate) fn start(shared: Arc) { + let mut me = Reactor { + shared, + direct_peers: Default::default(), + }; + if !me.is_host() { + me.direct_peers.insert( + PeerId(0), + DirectPeer::new( + me.shared + .host_addr + .expect("Can't be a client without a host addr"), + me.shared.max_packets_per_second, + ), + ); + me.direct_send(PeerId(0), NetMessageVariant::Login).unwrap(); + } + if me.is_host() { + me.shared.peer_state.store(PeerState::Connected); + } + let shared_c = Arc::clone(&me.shared); + let (inbound_s, inbound_r) = bounded(16); + thread::spawn(move || { + let shared_c_2 = Arc::clone(&shared_c); + if let Err(err) = Self::run_pipe(shared_c_2, inbound_s) { + shared_c.keep_alive.store(false, SeqCst); + shared_c.peer_state.store(PeerState::Disconnected); + error!("Reactor pipe error: {}", err); + } + }); + let shared_c = Arc::clone(&me.shared); + thread::spawn(move || { + if let Err(err) = me.run(inbound_r) { + shared_c.keep_alive.store(false, SeqCst); + shared_c.peer_state.store(PeerState::Disconnected); + error!("Reactor error: {}", err); + } + }); + } + + fn wrap_packet( + &self, + id: PeerId, + dst: Destination, + msg: NetMessageInner, + reliability: Reliability, + ) -> Result { + let seq_id = self.next_seq_id_for_peer(id)?; + let src = self.shared.my_id.load().expect("Should know own id by now"); + Self::wrap_packet_seq_id(src, src, seq_id, dst, msg, reliability) + } + + fn wrap_packet_seq_id( + src: PeerId, + origin_src: PeerId, + seq_id: SeqId, + dst: Destination, + msg: NetMessageInner, + reliability: Reliability, + ) -> Result { + Ok(NetMessageVariant::Normal(NetMessageNormal { + src, + origin_src, + dst, + seq_id, + inner: msg, + reliability, + })) + } +} + +impl DirectPeer { + fn new(incoming_addr: SocketAddr, rate_limit: usize) -> DirectPeer { + let now = Instant::now(); + DirectPeer { + addr: incoming_addr, + outbound_pending: Default::default(), + resend_pending: Default::default(), + confirmed: RingSet::new(1024), + rate_limit: RateLimiter::new(rate_limit, Duration::from_secs(1)), + seq_counter: AtomicU16::new(0), + recent_seq: RingSet::new(1024), + pending_confirms: VecDeque::new(), + last_confirm_sent: now, + last_seen: now, + } + } +} diff --git a/noita-proxy/tangled/src/util.rs b/noita-proxy/tangled/src/util.rs new file mode 100644 index 00000000..9f077d70 --- /dev/null +++ b/noita-proxy/tangled/src/util.rs @@ -0,0 +1,103 @@ +use std::{ + collections::{HashSet, VecDeque}, hash::Hash, time::{Duration, Instant} +}; + +pub struct RateLimiter { + moments: VecDeque, + time: Duration, + limit: usize, +} + +impl RateLimiter { + pub fn new(limit: usize, time: Duration) -> Self { + Self { + moments: VecDeque::with_capacity(limit), + time, + limit, + } + } + pub fn get_token(&mut self) -> bool { + let now = Instant::now(); + while self + .moments + .front() + .map_or(false, |moment| now - *moment > self.time) + { + self.moments.pop_front(); + } + if self.moments.len() < self.limit { + self.moments.push_back(now); + true + } else { + false + } + } +} + +pub struct RingSet { + set: HashSet, + ring: VecDeque, + limit: usize, +} + +impl RingSet { + pub fn new(limit: usize) -> Self { + assert!(limit > 0); + Self { + set: HashSet::new(), + ring: VecDeque::with_capacity(limit), + limit, + } + } + + pub fn add(&mut self, key: Key) { + if !self.contains(&key) { + if self.ring.len() >= self.limit { + let element = self.ring.pop_front().expect("Deque has elements"); + self.set.remove(&element); + } + self.set.insert(key.clone()); + self.ring.push_back(key); + } + } + + pub fn contains(&self, key: &Key) -> bool { + self.set.contains(key) + } +} + +#[cfg(test)] +mod tests { + use std::{thread, time::Duration}; + + use super::{RateLimiter, RingSet}; + + #[test] + fn rate_limit() { + let duration = Duration::from_micros(100); + let mut limiter = RateLimiter::new(4, duration); + + for _ in 0..4 { + assert!(limiter.get_token()) + } + assert!(!limiter.get_token()); + thread::sleep(duration * 2); + assert!(limiter.get_token()); + } + + #[test] + fn ring_set() { + let mut set = RingSet::new(3); + set.add(1); + assert!(set.contains(&1)); + set.add(2); + assert!(set.contains(&1)); + assert!(set.contains(&2)); + assert!(!set.contains(&3)); + set.add(3); + set.add(3); + set.add(4); + assert!(!set.contains(&1)); + assert!(set.contains(&4)); + } +}