WIP steam networking rewrite.

This commit is contained in:
IQuant 2024-07-22 17:41:45 +03:00
parent 0e86a2ca0a
commit 3a07b8eaf6
7 changed files with 263 additions and 29 deletions

View file

@ -2,9 +2,11 @@ extract_steam_redist:
python scripts/extract_steam_redist.py
add_dylib_debug: extract_steam_redist
mkdir noita-proxy/target/debug/ -p
cp target/tmp/libsteam_api.so noita-proxy/target/debug/
add_dylib_release: extract_steam_redist
mkdir noita-proxy/target/release/ -p
cp target/tmp/libsteam_api.so noita-proxy/target/release/
build:

17
noita-proxy/Cargo.lock generated
View file

@ -778,6 +778,20 @@ dependencies = [
"parking_lot_core",
]
[[package]]
name = "dashmap"
version = "6.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
@ -2132,6 +2146,7 @@ dependencies = [
"clipboard",
"crc",
"crossbeam",
"dashmap 6.0.1",
"eframe",
"egui-file-dialog",
"egui_extras",
@ -3401,7 +3416,7 @@ version = "0.2.0"
dependencies = [
"bincode",
"crossbeam",
"dashmap",
"dashmap 5.5.3",
"serde",
"test-log",
"tracing",

View file

@ -46,6 +46,7 @@ crc = "3.2.1"
argh = "0.1.12"
shlex = "1.3.0"
quick-xml = { version = "0.36.0", features = ["serialize"] }
dashmap = "6.0.1"
[build-dependencies]
winresource = "0.1.17"

View file

@ -1,10 +1,14 @@
use std::{fmt::Display, sync::Mutex};
use std::{fmt::Display, mem, sync::Mutex};
use crossbeam::channel;
use dashmap::DashMap;
use fluent_bundle::FluentValue;
use steamworks::{
networking_types::{NetworkingIdentity, SendFlags},
CallbackHandle, LobbyChatUpdate, LobbyId, LobbyType, SteamError, SteamId,
networking_sockets::{ListenSocket, NetPollGroup},
networking_types::{
ListenSocketEvent, NetworkingConnectionState, NetworkingIdentity, SendFlags,
},
CallbackHandle, ClientManager, LobbyChatUpdate, LobbyId, LobbyType, SteamError, SteamId,
};
use tangled::{PeerState, Reliability};
use tracing::{info, warn};
@ -16,7 +20,7 @@ use crate::{
use super::omni::{self, OmniNetworkEvent};
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConnectError {
VersionMismatch { remote_version: Version },
VersionMissing,
@ -49,17 +53,18 @@ impl Display for ConnectError {
}
enum SteamEvent {
LobbyCreated(LobbyId),
LobbyCreatedOrJoined(LobbyId),
LobbyError(SteamError),
LobbyJoinError(ConnectError),
PeerConnected(SteamId),
PeerDisconnected(SteamId),
PeerConnectedToLobby(SteamId),
PeerDisconnectedFromLobby(SteamId),
PeerStateChanged,
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExtraPeerState {
Tangled(PeerState),
CreatingMesh,
CouldNotConnect(ConnectError),
}
@ -70,6 +75,187 @@ pub struct InnerState {
state: ExtraPeerState,
}
enum ConnectionState {
AwaitingIncoming,
NetConnectionPending(steamworks::networking_sockets::NetConnection<ClientManager>),
NetConnection(steamworks::networking_sockets::NetConnection<ClientManager>),
}
impl ConnectionState {
fn switch_to_connected(&mut self) {
let current = mem::replace(self, ConnectionState::AwaitingIncoming);
if let ConnectionState::NetConnectionPending(conn) = current {
*self = ConnectionState::NetConnection(conn);
}
}
fn connection(&self) -> Option<&steamworks::networking_sockets::NetConnection<ClientManager>> {
if let ConnectionState::NetConnection(conn) = self {
Some(conn)
} else {
None
}
}
}
struct Connections {
client: steamworks::Client,
my_id: SteamId,
listen_socket: ListenSocket<ClientManager>,
poll_group: Mutex<NetPollGroup<ClientManager>>,
peers: DashMap<SteamId, ConnectionState>,
connected: Mutex<Vec<SteamId>>,
}
impl Connections {
fn new(client: &steamworks::Client) -> Self {
let my_id = client.user().steam_id();
let networking_sockets = client.networking_sockets();
let listen_socket = networking_sockets
.create_listen_socket_p2p(0, None)
.expect("handle to be valid"); // Unclear in what cases this can fail.
let poll_group = networking_sockets.create_poll_group().into();
Connections {
client: client.clone(),
my_id,
listen_socket,
poll_group,
peers: Default::default(),
connected: Default::default(),
}
}
fn poll_listener(&self) {
while let Some(event) = self.listen_socket.try_receive_event() {
match event {
ListenSocketEvent::Connecting(event) => {
info!("Peer {:?} connecting", event.remote());
event
.accept()
.inspect_err(|e| warn!("Error when accepting connection: {}", e))
.ok();
}
ListenSocketEvent::Connected(event) => {
if let Some(mut connection) = event
.remote()
.steam_id()
.and_then(|x| self.peers.get_mut(&x))
{
info!("Peer {:?} got connected event", event.remote());
*connection =
ConnectionState::NetConnectionPending(event.take_connection());
}
}
ListenSocketEvent::Disconnected(event) => {
info!(
"Peer {:?} disconnected, reason: {:?}",
event.remote(),
event.end_reason(),
)
}
}
}
}
fn poll_status(&self) -> bool {
let networking_sockets = self.client.networking_sockets();
let mut all_connected = true;
for mut state in self.peers.iter_mut() {
match state.value() {
ConnectionState::AwaitingIncoming => {
all_connected = false;
}
ConnectionState::NetConnectionPending(connection) => {
let info = networking_sockets
.get_connection_info(connection)
.expect("handle to be valid");
match info.state().expect("assuming state is always valid") {
// Wait.
NetworkingConnectionState::None
| NetworkingConnectionState::Connecting
| NetworkingConnectionState::FindingRoute => {
all_connected = false;
}
// Switch to connected state.
NetworkingConnectionState::Connected => {
info!(
"Connection of peer {:?} swithed to connected state",
*state.key()
);
self.connected.lock().unwrap().push(*state.key());
state.value_mut().switch_to_connected();
all_connected = false;
}
NetworkingConnectionState::ClosedByPeer
| NetworkingConnectionState::ProblemDetectedLocally => {
info!(
"Some problem happened for peer {:?}. Will try to connect again.",
*state.key()
);
self.connect(*state.key());
all_connected = false;
}
}
}
ConnectionState::NetConnection(_) => {}
}
}
all_connected
}
/// Returns true if everyone is connected.
fn poll(&self) -> bool {
self.poll_listener();
self.poll_status()
}
fn connect(&self, peer: SteamId) {
let networking_sockets = self.client.networking_sockets();
let peer_identity = NetworkingIdentity::new_steam_id(peer);
if peer > self.my_id {
info!("Awaiting incoming connection from {:?}", peer);
self.peers.insert(peer, ConnectionState::AwaitingIncoming);
} else {
info!("Initiating connection to {:?}", peer);
let connection = networking_sockets
.connect_p2p(peer_identity, 0, None)
.expect("handle to be valid");
connection.set_poll_group(&self.poll_group.lock().unwrap());
self.peers
.insert(peer, ConnectionState::NetConnectionPending(connection));
}
}
fn disconnect(&self, id: SteamId) {
info!("Removing connection to peer {:?}", id);
self.peers.remove(&id);
}
fn recv(&self) -> Vec<steamworks::networking_types::NetworkingMessage<ClientManager>> {
self.poll_group.lock().unwrap().receive_messages(1024)
}
fn send_message(
&self,
peer: SteamId,
send_flags: SendFlags,
msg: &[u8],
) -> Result<(), SteamError> {
if let Some(peer) = self.peers.get(&peer) {
if let Some(connection) = peer.value().connection() {
connection.send_message(msg, send_flags)?;
Ok(())
} else {
warn!("Couldn't send a message, connection is in invalid state");
Err(SteamError::Generic)
}
} else {
Err(SteamError::InvalidSteamID)
}
}
}
pub struct SteamPeer {
client: steamworks::Client,
events: channel::Receiver<SteamEvent>,
@ -78,11 +264,16 @@ pub struct SteamPeer {
is_host: bool,
inner: Mutex<InnerState>,
_cbs: Vec<CallbackHandle>,
connections: Connections,
}
impl SteamPeer {
pub fn new_host(lobby_type: LobbyType, client: steamworks::Client) -> Self {
let (sender, events) = channel::unbounded();
let connections = Connections::new(&client);
let matchmaking = client.matchmaking();
{
let sender = sender.clone();
@ -97,7 +288,7 @@ impl SteamPeer {
"ew_version",
&Version::current().to_string(),
);
SteamEvent::LobbyCreated(id)
SteamEvent::LobbyCreatedOrJoined(id)
}
Err(err) => SteamEvent::LobbyError(err),
};
@ -122,11 +313,13 @@ impl SteamPeer {
state: ExtraPeerState::Tangled(PeerState::PendingConnection),
}),
_cbs,
connections,
}
}
pub fn new_connect(lobby: LobbyId, client: steamworks::Client) -> Self {
let (sender, events) = channel::unbounded();
let connections = Connections::new(&client);
let matchmaking = client.matchmaking();
{
let sender = sender.clone();
@ -151,7 +344,7 @@ impl SteamPeer {
);
SteamEvent::LobbyJoinError(ConnectError::VersionMismatch { remote_version })
} else {
SteamEvent::LobbyCreated(id)
SteamEvent::LobbyCreatedOrJoined(id)
}
}
None => {
@ -184,6 +377,7 @@ impl SteamPeer {
state: ExtraPeerState::Tangled(PeerState::PendingConnection),
}),
_cbs,
connections,
}
}
@ -193,9 +387,10 @@ impl SteamPeer {
} else {
SendFlags::UNRELIABLE
};
let networking = self.client.networking_messages();
let res = networking
.send_message_to_user(NetworkingIdentity::new_steam_id(peer), send_type, msg, 0)
let res = self
.connections
.send_message(peer, send_type, msg)
.inspect_err(|err| {
warn!(
"Couldn't send a packet to {:?}, st {:?}, err {}",
@ -213,10 +408,15 @@ impl SteamPeer {
}
pub fn recv(&self) -> Vec<OmniNetworkEvent> {
let all_connected = self.connections.poll();
if all_connected && self.inner.lock().unwrap().state == ExtraPeerState::CreatingMesh {
info!("Switched to `all connected` state");
self.inner.lock().unwrap().state = ExtraPeerState::Tangled(PeerState::Connected);
}
let mut returned_events = Vec::new();
for event in self.events.try_iter() {
match event {
SteamEvent::LobbyCreated(id) => {
SteamEvent::LobbyCreatedOrJoined(id) => {
info!("Lobby ready");
self.inner.lock().unwrap().lobby_id = Some(id);
if !self.is_host {
@ -224,9 +424,9 @@ impl SteamPeer {
self.inner.lock().unwrap().host_id = host_id;
info!("Got host id: {:?}", host_id)
}
self.update_remote_peers();
self.inner.lock().unwrap().state =
ExtraPeerState::Tangled(PeerState::Connected);
self.update_lobby_list();
info!("Switched to `creating mesh` state");
self.inner.lock().unwrap().state = ExtraPeerState::CreatingMesh;
}
SteamEvent::LobbyError(err) => {
warn!("Could not create lobby: {}", err);
@ -237,17 +437,18 @@ impl SteamPeer {
warn!("Could not join lobby");
self.inner.lock().unwrap().state = ExtraPeerState::CouldNotConnect(err);
}
SteamEvent::PeerConnected(id) => {
returned_events.push(omni::OmniNetworkEvent::PeerConnected(id.into()))
SteamEvent::PeerConnectedToLobby(id) => {
self.connections.connect(id);
}
SteamEvent::PeerDisconnected(id) => {
SteamEvent::PeerDisconnectedFromLobby(id) => {
self.connections.disconnect(id);
returned_events.push(omni::OmniNetworkEvent::PeerDisconnected(id.into()))
}
SteamEvent::PeerStateChanged => self.update_remote_peers(),
SteamEvent::PeerStateChanged => self.update_lobby_list(),
}
}
let networking = self.client.networking_messages();
let messages = networking.receive_messages_on_channel(0, 1024);
let messages = self.connections.recv();
for message in messages {
let steam_id = message
.identity_peer()
@ -258,10 +459,17 @@ impl SteamPeer {
data: message.data().to_vec(), // TODO eliminate clone here.
})
}
let mut fully_connected = self.connections.connected.lock().unwrap();
for steam_id in fully_connected.iter() {
returned_events.push(omni::OmniNetworkEvent::PeerConnected((*steam_id).into()))
}
fully_connected.clear();
returned_events
}
fn update_remote_peers(&self) {
fn update_lobby_list(&self) {
info!("Updating peer list");
let matchmaking = self.client.matchmaking();
let lobby = self.inner.lock().unwrap().lobby_id;
let mut peers = match lobby {
@ -275,12 +483,16 @@ impl SteamPeer {
// TODO: could be done more efficiently
for peer in &peers {
if !current_peers.contains(peer) {
self.sender.send(SteamEvent::PeerConnected(*peer)).ok();
self.sender
.send(SteamEvent::PeerConnectedToLobby(*peer))
.ok();
}
}
for peer in &mut *current_peers {
if !peers.contains(peer) {
self.sender.send(SteamEvent::PeerDisconnected(*peer)).ok();
self.sender
.send(SteamEvent::PeerDisconnectedFromLobby(*peer))
.ok();
}
}

View file

@ -2,7 +2,7 @@ use std::{collections::HashMap, env, thread, time::Duration};
use eframe::egui::{self, ColorImage, RichText, TextureHandle, TextureOptions, Ui};
use steamworks::{SteamAPIInitError, SteamId};
use tracing::info;
use tracing::{error, info};
pub struct SteamUserAvatar {
avatar: TextureHandle,
@ -38,6 +38,9 @@ impl SteamState {
}
let app_id = env::var("NP_APPID").ok().and_then(|x| x.parse().ok());
let (client, single) = steamworks::Client::init_app(app_id.unwrap_or(881100))?;
if let Err(err) = client.networking_sockets().init_authentication() {
error!("Failed to init_authentication: {}", err)
}
thread::spawn(move || {
info!("Spawned steam callback thread");
loop {

View file

@ -76,7 +76,7 @@ struct OutboundMessage {
}
/// Current peer state
#[derive(Default, Clone, Copy, Debug)]
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq)]
pub enum PeerState {
/// Waiting for connection. Switches to `Connected` right after id from the host has been acquired.
/// Note: hosts switches to 'Connected' basically instantly.

View file

@ -61,6 +61,7 @@ def get_pull_requests_from(date):
return [PullRequest(entry["number"], entry["author"]["login"], entry["title"]) for entry in parsed]
def extract_steam_redist():
os.makedirs("target/tmp", exist_ok=True)
with ZipFile("redist/steam_dylib.zip", "r") as steam_dylib_zip:
steam_dylib_zip.extractall("target/tmp")