diff --git a/argh/node.py b/argh/node.py index b2e11ac..91460b5 100644 --- a/argh/node.py +++ b/argh/node.py @@ -38,39 +38,39 @@ class Node: while len(self._known_hashes) > self.max_known_hashes_size: self._known_hashes.pop(0) - async def _receiver_connection(self, r: NodeAddr): + async def _conn_to_receiver(self, r: NodeAddr): identifier = r.identifier() + log("Attempting to connect to receiver", identifier) async for ws in connect(r.ws_uri()): - log(identifier) + log("Connected to receiver", identifier) self._connections[identifier] = ws try: async for msg in ws: - log(msg) self._relay(msg) except (ConnectionClosed, ConnectionClosedError): del self._connections[identifier] - log(f"Connection from {identifier} closed", 1) + log(f"Lost connection to receiver", identifier, level=1) continue - async def _server_connection(self, ws: ServerConnection): + async def _conn_from_node_or_client(self, ws: ServerConnection): host, port = ws.remote_address addr = NodeAddr(host, port) identifier = addr.identifier() self._connections[identifier] = ws - log(identifier) + log("Accepted connection", identifier) async for msg in ws: - log(msg) self._relay(msg) + log("Lost connection", identifier) del self._connections[identifier] async def run(self): # connect to receivers receiver_tasks = [] for r in self.receivers: - t = asyncio.create_task(self._receiver_connection(r)) + t = asyncio.create_task(self._conn_to_receiver(r)) receiver_tasks.append(t) # server loop - async with serve(self._server_connection, self.listen_address.host, self.listen_address.port) as server: + async with serve(self._conn_from_node_or_client, self.listen_address.host, self.listen_address.port) as server: await server.serve_forever() # wait for receivers await asyncio.gather(receiver_tasks)