From edf42d8fad9107d1a422c26b0a1aa5cf29bbdbeb Mon Sep 17 00:00:00 2001 From: daiimus Date: Sun, 15 Feb 2026 08:10:54 -0800 Subject: [PATCH] Add WebSocket subprotocol negotiation per MUD Standards proposal Implement RFC 6455 Sec-WebSocket-Protocol negotiation for Evennia's WebSocket server, following the MUD Standards WebSocket specification (https://mudstandards.org/websocket/). New subprotocols: - json.mudstandards.org: JSON envelope format with structured OOB - gmcp.mudstandards.org: GMCP over WebSocket (TEXT + BINARY frames) - terminal.mudstandards.org: Raw ANSI/UTF-8 in BINARY frames Architecture: Wire format strategy pattern via composition. A single WebSocketClient delegates encoding/decoding to a WireFormat instance selected during onConnect(). Adding a new format means adding a single file to the wire_formats/ package with zero changes to webclient.py. Backward compatibility: Clients that send no Sec-WebSocket-Protocol header (including the built-in Evennia webclient) get v1.evennia.com format automatically. Existing behavior is fully preserved. Additional changes: - Extract shared GMCP encode/decode into gmcp_utils.py (used by both telnet and websocket GMCP paths) - Add WEBSOCKET_SUBPROTOCOLS setting for server-side format control - Built-in JS webclient now sends Sec-WebSocket-Protocol: v1.evennia.com - Comprehensive test suite for all wire formats (~870 lines) --- evennia/server/portal/gmcp_utils.py | 143 +++ evennia/server/portal/telnet_oob.py | 111 +-- evennia/server/portal/test_wire_formats.py | 883 ++++++++++++++++++ evennia/server/portal/tests.py | 11 +- evennia/server/portal/webclient.py | 279 +++++- .../server/portal/wire_formats/__init__.py | 44 + evennia/server/portal/wire_formats/base.py | 194 ++++ .../server/portal/wire_formats/evennia_v1.py | 131 +++ .../portal/wire_formats/gmcp_standard.py | 126 +++ .../portal/wire_formats/json_standard.py | 221 +++++ .../server/portal/wire_formats/terminal.py | 103 ++ evennia/settings_default.py | 20 + evennia/web/static/webclient/js/evennia.js | 4 +- 13 files changed, 2141 insertions(+), 129 deletions(-) create mode 100644 evennia/server/portal/gmcp_utils.py create mode 100644 evennia/server/portal/test_wire_formats.py create mode 100644 evennia/server/portal/wire_formats/__init__.py create mode 100644 evennia/server/portal/wire_formats/base.py create mode 100644 evennia/server/portal/wire_formats/evennia_v1.py create mode 100644 evennia/server/portal/wire_formats/gmcp_standard.py create mode 100644 evennia/server/portal/wire_formats/json_standard.py create mode 100644 evennia/server/portal/wire_formats/terminal.py diff --git a/evennia/server/portal/gmcp_utils.py b/evennia/server/portal/gmcp_utils.py new file mode 100644 index 0000000000..69fc93f58c --- /dev/null +++ b/evennia/server/portal/gmcp_utils.py @@ -0,0 +1,143 @@ +""" +Shared GMCP (Generic MUD Communication Protocol) utilities. + +This module provides encoding and decoding functions for GMCP messages, +shared between telnet OOB and WebSocket wire format implementations. + +GMCP messages follow the format: "Package.Subpackage json_payload" + +The mapping dictionaries translate between Evennia's internal command +names and standard GMCP package names. +""" + +import json + +from evennia.utils.utils import is_iter + +# Mapping from Evennia internal names to standard GMCP package names. +EVENNIA_TO_GMCP = { + "client_options": "Core.Supports.Get", + "get_inputfuncs": "Core.Commands.Get", + "get_value": "Char.Value.Get", + "repeat": "Char.Repeat.Update", + "monitor": "Char.Monitor.Update", +} + +# Reverse mapping from GMCP package names to Evennia internal names. +GMCP_TO_EVENNIA = {v: k for k, v in EVENNIA_TO_GMCP.items()} + + +def encode_gmcp(cmdname, *args, **kwargs): + """ + Encode an Evennia command into a GMCP message string. + + Args: + cmdname (str): Evennia OOB command name. + *args: Command arguments. + **kwargs: Command keyword arguments. + + Returns: + str: A GMCP-formatted string like "Package.Name json_data" + + Notes: + GMCP messages are formatted as: + [cmdname, [], {}] -> Cmd.Name + [cmdname, [arg], {}] -> Cmd.Name arg + [cmdname, [args], {}] -> Cmd.Name [args] + [cmdname, [], {kwargs}] -> Cmd.Name {kwargs} + [cmdname, [arg], {kwargs}] -> Cmd.Name [arg, {kwargs}] + [cmdname, [args], {kwargs}] -> Cmd.Name [[args], {kwargs}] + + Note: When there is exactly one positional argument, it is + collapsed (encoded directly rather than wrapped in a list). + This applies both with and without keyword arguments. This + is inherited behavior from the original telnet_oob.py. + + If cmdname has a direct mapping in EVENNIA_TO_GMCP, that + mapped name is used. Otherwise, underscores are converted to + dots with initial capitalization. Names without underscores + are placed in the Core package. + + """ + if cmdname in EVENNIA_TO_GMCP: + gmcp_cmdname = EVENNIA_TO_GMCP[cmdname] + elif "_" in cmdname: + gmcp_cmdname = ".".join( + word.capitalize() if not word.isupper() else word + for word in cmdname.split("_") + ) + else: + gmcp_cmdname = "Core.%s" % ( + cmdname if cmdname.istitle() else cmdname.capitalize() + ) + + if not (args or kwargs): + return gmcp_cmdname + elif args: + if len(args) == 1: + args = args[0] + if kwargs: + return "%s %s" % (gmcp_cmdname, json.dumps([args, kwargs])) + else: + return "%s %s" % (gmcp_cmdname, json.dumps(args)) + else: + return "%s %s" % (gmcp_cmdname, json.dumps(kwargs)) + + +def decode_gmcp(data): + """ + Decode a GMCP message string into Evennia command format. + + Args: + data (str or bytes): GMCP data in the form "Module.Submodule.Cmdname structure" + + Returns: + dict: A dict suitable for data_in(), e.g. {"cmdname": [[args], {kwargs}]} + Returns empty dict if data cannot be parsed. + + Notes: + Incoming GMCP is parsed as: + Core.Name -> {"name": [[], {}]} + Core.Name string -> {"name": [["string"], {}]} + Core.Name [arg, arg, ...] -> {"name": [[args], {}]} + Core.Name {key:val, ...} -> {"name": [[], {kwargs}]} + Core.Name [[args], {kwargs}] -> {"name": [[args], {kwargs}]} + + """ + if isinstance(data, bytes): + data = data.decode("utf-8", errors="replace") + + if not data: + return {} + + try: + cmdname, structure = data.split(None, 1) + except ValueError: + cmdname, structure = data, "" + + # Check if this is a known GMCP package name + if cmdname in GMCP_TO_EVENNIA: + evennia_cmdname = GMCP_TO_EVENNIA[cmdname] + else: + # Convert Package.Name to package_name + evennia_cmdname = cmdname.replace(".", "_") + if evennia_cmdname.lower().startswith("core_"): + evennia_cmdname = evennia_cmdname[5:] + evennia_cmdname = evennia_cmdname.lower() + + try: + structure = json.loads(structure) + except (json.JSONDecodeError, ValueError): + # structure is not JSON — treat as plain string + pass + + args, kwargs = [], {} + if is_iter(structure): + if isinstance(structure, dict): + kwargs = {key: value for key, value in structure.items() if key} + else: + args = list(structure) + elif structure: + args = [structure] + + return {evennia_cmdname: [args, kwargs]} diff --git a/evennia/server/portal/telnet_oob.py b/evennia/server/portal/telnet_oob.py index c8c3365ab0..aa1aef13e5 100644 --- a/evennia/server/portal/telnet_oob.py +++ b/evennia/server/portal/telnet_oob.py @@ -24,14 +24,16 @@ This implements the following telnet OOB communication protocols: """ -import json import re import weakref # General Telnet from twisted.conch.telnet import IAC, SB, SE -from evennia.utils.utils import is_iter +from .gmcp_utils import ( + decode_gmcp as _decode_gmcp, + encode_gmcp as _encode_gmcp_str, +) # MSDP-relevant telnet cmd/opt-codes MSDP = bytes([69]) @@ -59,14 +61,6 @@ msdp_regex_array = re.compile( msdp_regex_var = re.compile(rb"%s" % MSDP_VAR) msdp_regex_val = re.compile(rb"%s" % MSDP_VAL) -EVENNIA_TO_GMCP = { - "client_options": "Core.Supports.Get", - "get_inputfuncs": "Core.Commands.Get", - "get_value": "Char.Value.Get", - "repeat": "Char.Repeat.Update", - "monitor": "Char.Monitor.Update", -} - # MSDP/GMCP communication handler @@ -224,61 +218,16 @@ class TelnetOOB: cmdname (str): GMCP OOB command name. args, kwargs (any): Arguments to OOB command. - Notes: - GMCP messages will be outgoing on the following - form (the non-JSON cmdname at the start is what - IRE games use, supposedly, and what clients appear - to have adopted). A cmdname without Package will end - up in the Core package, while Core package names will - be stripped on the Evennia side. - :: - - [cmd_name, [], {}] -> Cmd.Name - [cmd_name, [arg], {}] -> Cmd.Name arg - [cmd_name, [args],{}] -> Cmd.Name [args] - [cmd_name, [], {kwargs}] -> Cmd.Name {kwargs} - [cmdname, [args, {kwargs}] -> Core.Cmdname [[args],{kwargs}] - - For more flexibility with certain clients, if `cmd_name` is capitalized, - Evennia will leave its current capitalization (So CMD_nAmE would be sent - as CMD.nAmE but cMD_Name would be Cmd.Name) + Returns: + bytes: GMCP-encoded byte string for telnet subnegotiation. Notes: - There are also a few default mappings between evennia outputcmds and GMCP: - :: - - client_options -> Core.Supports.Get - get_inputfuncs -> Core.Commands.Get - get_value -> Char.Value.Get - repeat -> Char.Repeat.Update - monitor -> Char.Monitor.Update + Delegates to the shared ``gmcp_utils.encode_gmcp`` and encodes + the resulting string to bytes for telnet transport. See + ``gmcp_utils.encode_gmcp`` for full format documentation. """ - - if cmdname in EVENNIA_TO_GMCP: - gmcp_cmdname = EVENNIA_TO_GMCP[cmdname] - elif "_" in cmdname: - # enforce initial capitalization of each command part, leaving fully-capitalized sections intact - gmcp_cmdname = ".".join( - word.capitalize() if not word.isupper() else word for word in cmdname.split("_") - ) - else: - gmcp_cmdname = "Core.%s" % (cmdname if cmdname.istitle() else cmdname.capitalize()) - - if not (args or kwargs): - gmcp_string = gmcp_cmdname - elif args: - if len(args) == 1: - args = args[0] - if kwargs: - gmcp_string = "%s %s" % (gmcp_cmdname, json.dumps([args, kwargs])) - else: - gmcp_string = "%s %s" % (gmcp_cmdname, json.dumps(args)) - else: # only kwargs - gmcp_string = "%s %s" % (gmcp_cmdname, json.dumps(kwargs)) - - # print("gmcp string", gmcp_string) # DEBUG - return gmcp_string.encode() + return _encode_gmcp_str(cmdname, *args, **kwargs).encode() def decode_msdp(self, data): """ @@ -386,46 +335,18 @@ class TelnetOOB: data (str or list): GMCP data. Notes: - Clients send data on the form "Module.Submodule.Cmdname ". - We assume the structure is valid JSON. - - The following is parsed into Evennia's formal structure: - :: - - Core.Name -> [name, [], {}] - Core.Name string -> [name, [string], {}] - Core.Name [arg, arg,...] -> [name, [args], {}] - Core.Name {key:arg, key:arg, ...} -> [name, [], {kwargs}] - Core.Name [[args], {kwargs}] -> [name, [args], {kwargs}] + Delegates to ``gmcp_utils.decode_gmcp`` for parsing, then + passes the result to ``self.protocol().data_in()``. + See ``gmcp_utils.decode_gmcp`` for full format documentation. """ if isinstance(data, list): data = b"".join(data) - # print("decode_gmcp in:", data) # DEBUG if data: - try: - cmdname, structure = data.split(None, 1) - except ValueError: - cmdname, structure = data, b"" - cmdname = cmdname.replace(b".", b"_") - try: - structure = json.loads(structure) - except ValueError: - # maybe the structure is not json-serialized at all - pass - args, kwargs = [], {} - if is_iter(structure): - if isinstance(structure, dict): - kwargs = {key: value for key, value in structure.items() if key} - else: - args = list(structure) - else: - args = (structure,) - if cmdname.lower().startswith(b"core_"): - # if Core.cmdname, then use cmdname - cmdname = cmdname[5:] - self.protocol().data_in(**{cmdname.lower().decode(): [args, kwargs]}) + cmds = _decode_gmcp(data) + if cmds: + self.protocol().data_in(**cmds) # access methods diff --git a/evennia/server/portal/test_wire_formats.py b/evennia/server/portal/test_wire_formats.py new file mode 100644 index 0000000000..eb47203e4d --- /dev/null +++ b/evennia/server/portal/test_wire_formats.py @@ -0,0 +1,883 @@ +""" +Tests for WebSocket wire formats and subprotocol negotiation. + +Tests cover: + - gmcp_utils.py: encode_gmcp / decode_gmcp + - Wire format codecs: EvenniaV1, Terminal, JsonStandard, GmcpStandard + - WebSocket subprotocol negotiation in webclient.py +""" + +import json + +try: + from django.utils.unittest import TestCase +except ImportError: + from django.test import TestCase + +from mock import MagicMock, Mock + + +# --------------------------------------------------------------------------- +# GMCP utilities +# --------------------------------------------------------------------------- + + +class TestGmcpEncode(TestCase): + """Tests for gmcp_utils.encode_gmcp().""" + + def setUp(self): + from evennia.server.portal.gmcp_utils import encode_gmcp + + self.encode = encode_gmcp + + def test_known_mapping(self): + """Commands in EVENNIA_TO_GMCP should use the mapped name.""" + result = self.encode("client_options") + self.assertEqual(result, "Core.Supports.Get") + + def test_known_mapping_with_args(self): + result = self.encode("get_value", "hp") + self.assertEqual(result, 'Char.Value.Get "hp"') + + def test_underscore_to_dotted(self): + """Underscored names should become dotted with capitalization.""" + result = self.encode("char_vitals") + self.assertEqual(result, "Char.Vitals") + + def test_no_underscore_gets_core_prefix(self): + """Single-word commands get Core. prefix.""" + result = self.encode("ping") + self.assertEqual(result, "Core.Ping") + + def test_already_title_case_preserved(self): + result = self.encode("Ping") + self.assertEqual(result, "Core.Ping") + + def test_fully_uppercase_preserved(self): + """Fully uppercase segments should stay uppercase.""" + result = self.encode("char_HP") + self.assertEqual(result, "Char.HP") + + def test_no_args_no_kwargs(self): + result = self.encode("ping") + self.assertEqual(result, "Core.Ping") + + def test_single_arg(self): + result = self.encode("ping", "test") + self.assertEqual(result, 'Core.Ping "test"') + + def test_multiple_args(self): + result = self.encode("ping", "a", "b") + self.assertEqual(result, 'Core.Ping ["a", "b"]') + + def test_kwargs_only(self): + result = self.encode("ping", hp=100) + self.assertEqual(result, 'Core.Ping {"hp": 100}') + + def test_args_and_kwargs(self): + result = self.encode("ping", "a", hp=100) + self.assertEqual(result, 'Core.Ping ["a", {"hp": 100}]') + + +class TestGmcpDecode(TestCase): + """Tests for gmcp_utils.decode_gmcp().""" + + def setUp(self): + from evennia.server.portal.gmcp_utils import decode_gmcp + + self.decode = decode_gmcp + + def test_known_mapping(self): + """Known GMCP package names should map to Evennia names.""" + result = self.decode("Core.Supports.Get") + self.assertIn("client_options", result) + self.assertEqual(result["client_options"], [[], {}]) + + def test_package_to_underscore(self): + """Unknown package names should become lowercase underscore.""" + result = self.decode("Char.Vitals") + self.assertIn("char_vitals", result) + + def test_core_prefix_stripped(self): + """Core. prefix should be stripped from the command name.""" + result = self.decode("Core.Ping") + self.assertIn("ping", result) + self.assertEqual(result["ping"], [[], {}]) + + def test_string_arg(self): + result = self.decode('Core.Ping "test"') + self.assertIn("ping", result) + self.assertEqual(result["ping"], [["test"], {}]) + + def test_array_arg(self): + result = self.decode('Core.Ping ["a", "b"]') + self.assertIn("ping", result) + self.assertEqual(result["ping"], [["a", "b"], {}]) + + def test_dict_arg(self): + result = self.decode('Core.Ping {"hp": 100}') + self.assertIn("ping", result) + self.assertEqual(result["ping"], [[], {"hp": 100}]) + + def test_bytes_input(self): + result = self.decode(b"Core.Ping") + self.assertIn("ping", result) + + def test_empty_input(self): + result = self.decode("") + self.assertEqual(result, {}) + + def test_non_json_structure(self): + """Non-JSON data after command name should be treated as string.""" + result = self.decode("Core.Ping hello world") + self.assertIn("ping", result) + self.assertEqual(result["ping"], [["hello world"], {}]) + + +# --------------------------------------------------------------------------- +# Wire format base +# --------------------------------------------------------------------------- + + +class TestWireFormatBase(TestCase): + """Tests for the WireFormat abstract base class.""" + + def test_abstract_methods_raise(self): + from evennia.server.portal.wire_formats.base import WireFormat + + fmt = WireFormat() + with self.assertRaises(NotImplementedError): + fmt.decode_incoming(b"", False) + with self.assertRaises(NotImplementedError): + fmt.encode_text("hello") + with self.assertRaises(NotImplementedError): + fmt.encode_default("cmd") + + def test_encode_prompt_delegates_to_encode_text(self): + """Default encode_prompt should call encode_text with send_prompt=True.""" + from evennia.server.portal.wire_formats.base import WireFormat + + fmt = WireFormat() + # Monkey-patch encode_text to verify delegation + fmt.encode_text = MagicMock(return_value=(b"test", False)) + result = fmt.encode_prompt("hello", options={"raw": False}) + fmt.encode_text.assert_called_once() + call_kwargs = fmt.encode_text.call_args + self.assertTrue(call_kwargs.kwargs["options"]["send_prompt"]) + + +# --------------------------------------------------------------------------- +# Evennia V1 wire format +# --------------------------------------------------------------------------- + + +class TestEvenniaV1Format(TestCase): + """Tests for the v1.evennia.com wire format.""" + + def setUp(self): + from evennia.server.portal.wire_formats.evennia_v1 import EvenniaV1Format + + self.fmt = EvenniaV1Format() + + def test_name(self): + self.assertEqual(self.fmt.name, "v1.evennia.com") + + def test_supports_oob(self): + self.assertTrue(self.fmt.supports_oob) + + # --- decode --- + + def test_decode_text(self): + payload = json.dumps(["text", ["look"], {}]).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertEqual(result, {"text": [["look"], {}]}) + + def test_decode_oob_command(self): + payload = json.dumps(["logged_in", [], {}]).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertEqual(result, {"logged_in": [[], {}]}) + + def test_decode_invalid_json(self): + result = self.fmt.decode_incoming(b"not json", is_binary=False) + self.assertIsNone(result) + + def test_decode_short_array(self): + payload = json.dumps(["text"]).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIsNone(result) + + # --- encode text --- + + def test_encode_text_basic(self): + result = self.fmt.encode_text("Hello |rworld|n", protocol_flags={}) + self.assertIsNotNone(result) + data, is_binary = result + self.assertFalse(is_binary) + parsed = json.loads(data) + self.assertEqual(parsed[0], "text") + # Should contain HTML (from parse_html) + self.assertIsInstance(parsed[1][0], str) + + def test_encode_text_none(self): + result = self.fmt.encode_text(None, protocol_flags={}) + self.assertIsNone(result) + + def test_encode_text_no_args(self): + result = self.fmt.encode_text(protocol_flags={}) + self.assertIsNone(result) + + def test_encode_text_prompt(self): + result = self.fmt.encode_text( + "HP: 100", protocol_flags={}, options={"send_prompt": True} + ) + data, is_binary = result + parsed = json.loads(data) + self.assertEqual(parsed[0], "prompt") + + def test_encode_text_nocolor(self): + result = self.fmt.encode_text( + "Hello |rworld|n", protocol_flags={}, options={"nocolor": True} + ) + data, _ = result + parsed = json.loads(data) + # With nocolor, ANSI should be stripped + self.assertNotIn("|r", parsed[1][0]) + + # --- encode default (OOB) --- + + def test_encode_default(self): + result = self.fmt.encode_default("custom_cmd", "arg1", protocol_flags={}, key="val") + data, is_binary = result + self.assertFalse(is_binary) + parsed = json.loads(data) + self.assertEqual(parsed[0], "custom_cmd") + + def test_encode_default_options_skipped(self): + """The 'options' command should be silently dropped.""" + result = self.fmt.encode_default("options", protocol_flags={}) + self.assertIsNone(result) + + +# --------------------------------------------------------------------------- +# Terminal wire format +# --------------------------------------------------------------------------- + + +class TestTerminalFormat(TestCase): + """Tests for the terminal.mudstandards.org wire format.""" + + def setUp(self): + from evennia.server.portal.wire_formats.terminal import TerminalFormat + + self.fmt = TerminalFormat() + + def test_name(self): + self.assertEqual(self.fmt.name, "terminal.mudstandards.org") + + def test_no_oob(self): + self.assertFalse(self.fmt.supports_oob) + + # --- decode --- + + def test_decode_binary(self): + payload = "look".encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=True) + self.assertEqual(result, {"text": [["look"], {}]}) + + def test_decode_strips_whitespace(self): + payload = " look \n".encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=True) + self.assertEqual(result, {"text": [["look"], {}]}) + + def test_decode_empty(self): + result = self.fmt.decode_incoming(b" ", is_binary=True) + self.assertIsNone(result) + + def test_decode_invalid_utf8(self): + result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=True) + self.assertIsNone(result) + + # --- encode text --- + + def test_encode_text_binary_frame(self): + result = self.fmt.encode_text("Hello world", protocol_flags={}) + self.assertIsNotNone(result) + data, is_binary = result + self.assertTrue(is_binary) + self.assertIsInstance(data, bytes) + + def test_encode_text_preserves_ansi(self): + """Terminal format should output real ANSI escape sequences.""" + result = self.fmt.encode_text("Hello |rworld|n", protocol_flags={}) + data, _ = result + text = data.decode("utf-8") + # parse_ansi converts |r to ESC[31m (or similar) + self.assertIn("\033[", text) + + def test_encode_text_none(self): + result = self.fmt.encode_text(None, protocol_flags={}) + self.assertIsNone(result) + + # --- encode default (OOB) --- + + def test_encode_default_returns_none(self): + """OOB should be silently dropped for terminal format.""" + result = self.fmt.encode_default("custom_cmd", "arg1", protocol_flags={}) + self.assertIsNone(result) + + +# --------------------------------------------------------------------------- +# JSON MUD Standards wire format +# --------------------------------------------------------------------------- + + +class TestJsonStandardFormat(TestCase): + """Tests for the json.mudstandards.org wire format.""" + + def setUp(self): + from evennia.server.portal.wire_formats.json_standard import JsonStandardFormat + + self.fmt = JsonStandardFormat() + + def test_name(self): + self.assertEqual(self.fmt.name, "json.mudstandards.org") + + def test_supports_oob(self): + self.assertTrue(self.fmt.supports_oob) + + # --- decode --- + + def test_decode_binary_as_text(self): + """BINARY frames should be treated as raw text input.""" + payload = "look".encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=True) + self.assertEqual(result, {"text": [["look"], {}]}) + + def test_decode_binary_empty(self): + result = self.fmt.decode_incoming(b" ", is_binary=True) + self.assertIsNone(result) + + def test_decode_text_envelope(self): + """TEXT frames should be parsed as JSON envelopes.""" + envelope = {"proto": "text", "id": "", "data": "look around"} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertEqual(result, {"text": [["look around"], {}]}) + + def test_decode_gmcp_envelope(self): + """GMCP-in-JSON envelopes should be decoded.""" + envelope = {"proto": "gmcp", "id": "Core.Ping", "data": ""} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIn("ping", result) + + def test_decode_gmcp_envelope_with_data(self): + envelope = {"proto": "gmcp", "id": "Char.Vitals", "data": '{"hp": 100}'} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIn("char_vitals", result) + + def test_decode_websocket_close(self): + envelope = {"proto": "websocket_close", "id": "", "data": ""} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIn("websocket_close", result) + + def test_decode_invalid_json_text_frame(self): + result = self.fmt.decode_incoming(b"not json", is_binary=False) + self.assertIsNone(result) + + def test_decode_generic_proto(self): + """Unknown proto should pass through as-is.""" + envelope = {"proto": "custom", "id": "my_cmd", "data": '{"key": "val"}'} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIn("my_cmd", result) + self.assertEqual(result["my_cmd"], [[], {"key": "val"}]) + + # --- encode text --- + + def test_encode_text_binary_frame(self): + """Text should be sent as BINARY frames with raw ANSI.""" + result = self.fmt.encode_text("Hello world", protocol_flags={}) + data, is_binary = result + self.assertTrue(is_binary) + self.assertIsInstance(data, bytes) + + def test_encode_text_preserves_ansi(self): + result = self.fmt.encode_text("Hello |rworld|n", protocol_flags={}) + data, _ = result + text = data.decode("utf-8") + self.assertIn("\033[", text) + + # --- encode prompt --- + + def test_encode_prompt_as_json_text_frame(self): + """Prompts should be JSON envelopes in TEXT frames.""" + result = self.fmt.encode_prompt("HP: 100>", protocol_flags={}) + data, is_binary = result + self.assertFalse(is_binary) + envelope = json.loads(data.decode("utf-8")) + self.assertEqual(envelope["proto"], "prompt") + + # --- encode default (OOB) --- + + def test_encode_default_gmcp_in_json(self): + """OOB should be encoded as GMCP-in-JSON envelope.""" + result = self.fmt.encode_default("ping", protocol_flags={}) + data, is_binary = result + self.assertFalse(is_binary) + envelope = json.loads(data.decode("utf-8")) + self.assertEqual(envelope["proto"], "gmcp") + self.assertEqual(envelope["id"], "Core.Ping") + + def test_encode_default_options_skipped(self): + result = self.fmt.encode_default("options", protocol_flags={}) + self.assertIsNone(result) + + +# --------------------------------------------------------------------------- +# GMCP MUD Standards wire format +# --------------------------------------------------------------------------- + + +class TestGmcpStandardFormat(TestCase): + """Tests for the gmcp.mudstandards.org wire format.""" + + def setUp(self): + from evennia.server.portal.wire_formats.gmcp_standard import GmcpStandardFormat + + self.fmt = GmcpStandardFormat() + + def test_name(self): + self.assertEqual(self.fmt.name, "gmcp.mudstandards.org") + + def test_supports_oob(self): + self.assertTrue(self.fmt.supports_oob) + + # --- decode --- + + def test_decode_binary_as_text(self): + payload = "look".encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=True) + self.assertEqual(result, {"text": [["look"], {}]}) + + def test_decode_text_as_gmcp(self): + """TEXT frames should be parsed as raw GMCP strings.""" + payload = "Core.Ping".encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIn("ping", result) + + def test_decode_gmcp_with_data(self): + payload = 'Char.Vitals {"hp": 100}'.encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertIn("char_vitals", result) + self.assertEqual(result["char_vitals"], [[], {"hp": 100}]) + + def test_decode_binary_invalid_utf8(self): + result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=True) + self.assertIsNone(result) + + def test_decode_text_invalid_utf8(self): + result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=False) + self.assertIsNone(result) + + # --- encode text --- + + def test_encode_text_binary_frame(self): + result = self.fmt.encode_text("Hello world", protocol_flags={}) + data, is_binary = result + self.assertTrue(is_binary) + self.assertIsInstance(data, bytes) + + # --- encode prompt --- + + def test_encode_prompt_binary_frame(self): + """GMCP format sends prompts as BINARY frames like regular text.""" + result = self.fmt.encode_prompt("HP: 100>", protocol_flags={}) + data, is_binary = result + self.assertTrue(is_binary) + + # --- encode default (OOB) --- + + def test_encode_default_gmcp_text_frame(self): + """OOB should be raw GMCP strings in TEXT frames.""" + result = self.fmt.encode_default("ping", protocol_flags={}) + data, is_binary = result + self.assertFalse(is_binary) + text = data.decode("utf-8") + self.assertTrue(text.startswith("Core.Ping")) + + def test_encode_default_with_args(self): + result = self.fmt.encode_default("ping", "test", protocol_flags={}) + data, _ = result + text = data.decode("utf-8") + self.assertIn("Core.Ping", text) + self.assertIn("test", text) + + def test_encode_default_options_skipped(self): + result = self.fmt.encode_default("options", protocol_flags={}) + self.assertIsNone(result) + + +# --------------------------------------------------------------------------- +# Wire format registry +# --------------------------------------------------------------------------- + + +class TestWireFormatRegistry(TestCase): + """Tests for the wire_formats package registry.""" + + def test_registry_has_all_formats(self): + from evennia.server.portal.wire_formats import WIRE_FORMATS + + self.assertIn("v1.evennia.com", WIRE_FORMATS) + self.assertIn("json.mudstandards.org", WIRE_FORMATS) + self.assertIn("gmcp.mudstandards.org", WIRE_FORMATS) + self.assertIn("terminal.mudstandards.org", WIRE_FORMATS) + + def test_registry_order_prefers_json(self): + """json.mudstandards.org should be first (highest priority).""" + from evennia.server.portal.wire_formats import WIRE_FORMATS + + keys = list(WIRE_FORMATS.keys()) + self.assertEqual(keys[0], "json.mudstandards.org") + + def test_registry_instances_are_correct_types(self): + from evennia.server.portal.wire_formats import ( + WIRE_FORMATS, + EvenniaV1Format, + GmcpStandardFormat, + JsonStandardFormat, + TerminalFormat, + ) + + self.assertIsInstance(WIRE_FORMATS["v1.evennia.com"], EvenniaV1Format) + self.assertIsInstance(WIRE_FORMATS["json.mudstandards.org"], JsonStandardFormat) + self.assertIsInstance(WIRE_FORMATS["gmcp.mudstandards.org"], GmcpStandardFormat) + self.assertIsInstance(WIRE_FORMATS["terminal.mudstandards.org"], TerminalFormat) + + +# --------------------------------------------------------------------------- +# WebSocket subprotocol negotiation (integration tests) +# --------------------------------------------------------------------------- + + +class TestWebSocketSubprotocolNegotiation(TestCase): + """ + Tests for the onConnect() subprotocol negotiation in WebSocketClient. + + These test the negotiation logic in isolation without starting a full + Twisted reactor, by directly calling onConnect() with mock request objects. + """ + + def _make_client(self): + """Create a WebSocketClient without connecting it.""" + from evennia.server.portal.webclient import WebSocketClient + + client = WebSocketClient() + return client + + def _make_request(self, protocols=None): + """Create a mock ConnectionRequest with the given protocols list.""" + request = Mock() + request.protocols = protocols or [] + return request + + def test_no_subprotocol_offered(self): + """Client sends no Sec-WebSocket-Protocol → v1 fallback, returns None.""" + client = self._make_client() + request = self._make_request(protocols=[]) + result = client.onConnect(request) + self.assertIsNone(result) + self.assertIsNotNone(client.wire_format) + self.assertEqual(client.wire_format.name, "v1.evennia.com") + + def test_v1_subprotocol_offered(self): + """Client offers v1.evennia.com → selected and returned.""" + client = self._make_client() + request = self._make_request(protocols=["v1.evennia.com"]) + result = client.onConnect(request) + self.assertEqual(result, "v1.evennia.com") + self.assertEqual(client.wire_format.name, "v1.evennia.com") + + def test_json_subprotocol_offered(self): + client = self._make_client() + request = self._make_request(protocols=["json.mudstandards.org"]) + result = client.onConnect(request) + self.assertEqual(result, "json.mudstandards.org") + self.assertEqual(client.wire_format.name, "json.mudstandards.org") + + def test_gmcp_subprotocol_offered(self): + client = self._make_client() + request = self._make_request(protocols=["gmcp.mudstandards.org"]) + result = client.onConnect(request) + self.assertEqual(result, "gmcp.mudstandards.org") + self.assertEqual(client.wire_format.name, "gmcp.mudstandards.org") + + def test_terminal_subprotocol_offered(self): + client = self._make_client() + request = self._make_request(protocols=["terminal.mudstandards.org"]) + result = client.onConnect(request) + self.assertEqual(result, "terminal.mudstandards.org") + self.assertEqual(client.wire_format.name, "terminal.mudstandards.org") + + def test_server_preference_wins(self): + """When client offers multiple, server's preference order wins.""" + client = self._make_client() + # Client offers terminal first, but server prefers json + request = self._make_request( + protocols=["terminal.mudstandards.org", "json.mudstandards.org"] + ) + result = client.onConnect(request) + # Server preference: json > gmcp > terminal > v1 + self.assertEqual(result, "json.mudstandards.org") + + def test_unknown_subprotocol_falls_back(self): + """Client offers only unknown protocols → v1 fallback.""" + client = self._make_client() + request = self._make_request(protocols=["unknown.protocol"]) + result = client.onConnect(request) + self.assertIsNone(result) + self.assertEqual(client.wire_format.name, "v1.evennia.com") + + def test_mixed_known_and_unknown(self): + """Client offers unknown + known → known is selected.""" + client = self._make_client() + request = self._make_request( + protocols=["unknown.protocol", "terminal.mudstandards.org"] + ) + result = client.onConnect(request) + self.assertEqual(result, "terminal.mudstandards.org") + + def test_empty_subprotocols_setting(self): + """WEBSOCKET_SUBPROTOCOLS=[] disables negotiation; clients fall back to v1.""" + from evennia.server.portal import webclient + + original = webclient._get_supported_subprotocols + webclient._get_supported_subprotocols = lambda: [] + try: + client = self._make_client() + request = self._make_request(protocols=["json.mudstandards.org"]) + result = client.onConnect(request) + # No match possible — falls back to v1, returns None + self.assertIsNone(result) + self.assertEqual(client.wire_format.name, "v1.evennia.com") + finally: + webclient._get_supported_subprotocols = original + + +# --------------------------------------------------------------------------- +# Integration: full message round-trip through wire formats +# --------------------------------------------------------------------------- + + +class TestEvenniaV1RoundTrip(TestCase): + """Test encode → decode round-trip for v1.evennia.com.""" + + def setUp(self): + from evennia.server.portal.wire_formats.evennia_v1 import EvenniaV1Format + + self.fmt = EvenniaV1Format() + + def test_oob_roundtrip(self): + """Encode an OOB command and decode the result.""" + encoded = self.fmt.encode_default("custom_cmd", "arg1", protocol_flags={}) + data, is_binary = encoded + # Decode it back + result = self.fmt.decode_incoming(data, is_binary=is_binary) + self.assertIn("custom_cmd", result) + + +class TestGmcpRoundTrip(TestCase): + """Test encode → decode round-trip for gmcp.mudstandards.org.""" + + def setUp(self): + from evennia.server.portal.wire_formats.gmcp_standard import GmcpStandardFormat + + self.fmt = GmcpStandardFormat() + + def test_oob_roundtrip_no_args(self): + """Encode a command without args and decode.""" + encoded = self.fmt.encode_default("ping", protocol_flags={}) + data, is_binary = encoded + result = self.fmt.decode_incoming(data, is_binary=is_binary) + self.assertIn("ping", result) + + def test_oob_roundtrip_with_kwargs(self): + """Encode a command with kwargs and decode.""" + encoded = self.fmt.encode_default("get_value", protocol_flags={}, hp=100) + data, is_binary = encoded + result = self.fmt.decode_incoming(data, is_binary=is_binary) + self.assertIn("get_value", result) + + +class TestJsonStandardRoundTrip(TestCase): + """Test encode → decode round-trip for json.mudstandards.org.""" + + def setUp(self): + from evennia.server.portal.wire_formats.json_standard import JsonStandardFormat + + self.fmt = JsonStandardFormat() + + def test_oob_roundtrip(self): + """Encode an OOB command as GMCP-in-JSON and decode.""" + encoded = self.fmt.encode_default("ping", protocol_flags={}) + data, is_binary = encoded + result = self.fmt.decode_incoming(data, is_binary=is_binary) + self.assertIn("ping", result) + + def test_prompt_roundtrip(self): + """Encode a prompt and verify the envelope.""" + encoded = self.fmt.encode_prompt("HP: 100>", protocol_flags={}) + data, is_binary = encoded + self.assertFalse(is_binary) + envelope = json.loads(data.decode("utf-8")) + self.assertEqual(envelope["proto"], "prompt") + self.assertIn("HP:", envelope["data"]) + + +# --------------------------------------------------------------------------- +# Edge-case tests +# --------------------------------------------------------------------------- + + +class TestTerminalEdgeCases(TestCase): + """Edge-case tests for TerminalFormat.""" + + def setUp(self): + from evennia.server.portal.wire_formats.terminal import TerminalFormat + + self.fmt = TerminalFormat() + + def test_decode_text_frame_treated_as_text(self): + """TEXT frames should still be decoded as text input.""" + payload = "look".encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + self.assertEqual(result, {"text": [["look"], {}]}) + + def test_decode_empty_bytes(self): + """Empty bytes should return None.""" + result = self.fmt.decode_incoming(b"", is_binary=True) + self.assertIsNone(result) + + def test_decode_empty_bytes_text_frame(self): + """Empty TEXT frame bytes should return None.""" + result = self.fmt.decode_incoming(b"", is_binary=False) + self.assertIsNone(result) + + +class TestJsonStandardEdgeCases(TestCase): + """Edge-case tests for JsonStandardFormat.""" + + def setUp(self): + from evennia.server.portal.wire_formats.json_standard import JsonStandardFormat + + self.fmt = JsonStandardFormat() + + def test_decode_envelope_missing_proto(self): + """JSON envelope with missing proto should use empty string default.""" + envelope = {"id": "my_cmd", "data": "test"} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + # Empty proto, cmd_id="my_cmd" → funcname="my_cmd" + self.assertIn("my_cmd", result) + + def test_decode_envelope_missing_all_fields(self): + """JSON envelope with no recognized fields returns None (empty funcname).""" + payload = json.dumps({}).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + # proto="", cmd_id="", data="" → generic handler, funcname="" → None + self.assertIsNone(result) + + def test_decode_envelope_missing_id_and_data(self): + """Envelope with only proto should work.""" + envelope = {"proto": "text"} + payload = json.dumps(envelope).encode("utf-8") + result = self.fmt.decode_incoming(payload, is_binary=False) + # proto="text", data="" → text input with empty string + self.assertEqual(result, {"text": [[""], {}]}) + + def test_decode_binary_invalid_utf8(self): + """Invalid UTF-8 in BINARY frame should return None.""" + result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=True) + self.assertIsNone(result) + + +class TestGmcpStandardEdgeCases(TestCase): + """Edge-case tests for GmcpStandardFormat.""" + + def setUp(self): + from evennia.server.portal.wire_formats.gmcp_standard import GmcpStandardFormat + + self.fmt = GmcpStandardFormat() + + def test_decode_empty_binary(self): + """Empty BINARY frame should return None.""" + result = self.fmt.decode_incoming(b"", is_binary=True) + self.assertIsNone(result) + + def test_decode_binary_whitespace_only(self): + """Whitespace-only BINARY frame should return None.""" + result = self.fmt.decode_incoming(b" \n ", is_binary=True) + self.assertIsNone(result) + + +class TestBaseWireFormatHelpers(TestCase): + """Tests for the shared helper methods on WireFormat base class.""" + + def setUp(self): + from evennia.server.portal.wire_formats.base import WireFormat + + self.base = WireFormat() + + def test_extract_text_and_flags_basic(self): + """Basic extraction with no options or flags.""" + kwargs = {} + result = self.base._extract_text_and_flags(("hello",), kwargs, {}) + self.assertEqual(result, ("hello", False, False)) + + def test_extract_text_and_flags_none_text(self): + """None as text should return None.""" + kwargs = {} + result = self.base._extract_text_and_flags((None,), kwargs, {}) + self.assertIsNone(result) + + def test_extract_text_and_flags_no_args(self): + """Empty args should return None.""" + kwargs = {} + result = self.base._extract_text_and_flags((), kwargs, {}) + self.assertIsNone(result) + + def test_extract_text_and_flags_options_override(self): + """Options should override protocol_flags.""" + kwargs = {"options": {"nocolor": True, "screenreader": True}} + result = self.base._extract_text_and_flags( + ("hello",), kwargs, {"NOCOLOR": False, "SCREENREADER": False} + ) + self.assertEqual(result, ("hello", True, True)) + + def test_extract_text_and_flags_from_protocol_flags(self): + """Protocol flags should be used when options are absent.""" + kwargs = {} + result = self.base._extract_text_and_flags( + ("hello",), kwargs, {"NOCOLOR": True} + ) + self.assertEqual(result, ("hello", True, False)) + + def test_process_ansi_normal(self): + """Normal mode should produce ANSI escape sequences.""" + text = self.base._process_ansi("Hello |rworld|n", False, False) + self.assertIn("\033[", text) + + def test_process_ansi_nocolor(self): + """Nocolor mode should strip all ANSI.""" + text = self.base._process_ansi("Hello |rworld|n", True, False) + self.assertNotIn("\033[", text) + self.assertNotIn("|r", text) + + def test_process_ansi_screenreader(self): + """Screenreader mode should strip ANSI and apply regex.""" + text = self.base._process_ansi("Hello |rworld|n", False, True) + self.assertNotIn("\033[", text) + self.assertNotIn("|r", text) diff --git a/evennia/server/portal/tests.py b/evennia/server/portal/tests.py index b26cf0644b..efe16fc3df 100644 --- a/evennia/server/portal/tests.py +++ b/evennia/server/portal/tests.py @@ -330,7 +330,12 @@ class TestWebSocket(BaseEvenniaTest): @mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock()) def test_data_out(self): self.proto.onOpen() - self.proto.sendLine = MagicMock() - msg = json.dumps(["logged_in", (), {}]) + self.proto.sendEncoded = MagicMock() self.proto.sessionhandler.data_out(self.proto, text=[["Excepting Alice"], {}]) - self.proto.sendLine.assert_called_with(json.dumps(["text", ["Excepting Alice"], {}])) + self.proto.sendEncoded.assert_called_once() + call_args = self.proto.sendEncoded.call_args + data = call_args[0][0] + # EvenniaV1Format encodes as JSON TEXT frame + parsed = json.loads(data) + self.assertEqual(parsed[0], "text") + self.assertEqual(parsed[1], ["Excepting Alice"]) diff --git a/evennia/server/portal/webclient.py b/evennia/server/portal/webclient.py index 1f35d473ee..051b45adbc 100644 --- a/evennia/server/portal/webclient.py +++ b/evennia/server/portal/webclient.py @@ -1,11 +1,28 @@ """ -Webclient based on websockets. +Webclient based on websockets with MUD Standards subprotocol support. This implements a webclient with WebSockets (http://en.wikipedia.org/wiki/WebSocket) by use of the autobahn-python package's implementation (https://github.com/crossbario/autobahn-python). It is used together with evennia/web/media/javascript/evennia_websocket_webclient.js. -All data coming into the webclient is in the form of valid JSON on the form +Subprotocol Negotiation (RFC 6455 Sec-WebSocket-Protocol): + When a client connects, it may offer one or more WebSocket subprotocols + via the Sec-WebSocket-Protocol header. This module negotiates the best + match from the server's supported list (configured via + settings.WEBSOCKET_SUBPROTOCOLS) and selects the appropriate wire format + codec for the connection's lifetime. + + Supported subprotocols (per https://mudstandards.org/websocket/): + - v1.evennia.com: Evennia's legacy JSON array format + - json.mudstandards.org: MUD Standards JSON envelope format + - gmcp.mudstandards.org: GMCP over WebSocket + - terminal.mudstandards.org: Raw ANSI terminal over WebSocket + + If no subprotocol is negotiated (legacy client with no header), + the v1.evennia.com format is used as the default. + +All data coming into the webclient via the v1.evennia.com format is in the +form of valid JSON on the form `["inputfunc_name", [args], {kwarg}]` @@ -15,21 +32,14 @@ from the command line and interprets it as an Evennia Command: `["text", ["look" """ -import html import json -import re from autobahn.exception import Disconnected from autobahn.twisted.websocket import WebSocketServerProtocol from django.conf import settings -from evennia.utils.ansi import parse_ansi -from evennia.utils.text2html import parse_html from evennia.utils.utils import class_from_module, mod_import -_RE_SCREENREADER_REGEX = re.compile( - r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE -) _CLIENT_SESSIONS = mod_import(settings.SESSION_ENGINE).SessionStore _UPSTREAM_IPS = settings.UPSTREAM_IPS @@ -43,11 +53,62 @@ GOING_AWAY = WebSocketServerProtocol.CLOSE_STATUS_CODE_GOING_AWAY _BASE_SESSION_CLASS = class_from_module(settings.BASE_SESSION_CLASS) +# --- Wire format support --- +# Import wire formats lazily to avoid circular imports at module level. +# The WIRE_FORMATS dict and format instances are created on first use. +_wire_formats = None + + +def _get_wire_formats(): + """ + Lazily load and return the wire format registry. + + Returns: + dict: Mapping of subprotocol name -> WireFormat instance. + + """ + global _wire_formats + if _wire_formats is None: + try: + from evennia.server.portal.wire_formats import WIRE_FORMATS + + _wire_formats = WIRE_FORMATS + except Exception: + from evennia.utils import logger + + logger.log_trace("Failed to load wire format registry") + _wire_formats = {} + return _wire_formats + + +def _get_supported_subprotocols(): + """ + Get the ordered list of supported subprotocol names from settings. + + Falls back to all available wire formats if the setting is not defined. + + Returns: + list: Ordered list of subprotocol name strings. + + """ + configured = getattr(settings, "WEBSOCKET_SUBPROTOCOLS", None) + if configured is not None: + return list(configured) + return list(_get_wire_formats().keys()) + class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): """ Implements the server-side of the Websocket connection. + Supports multiple wire formats via RFC 6455 subprotocol negotiation. + The wire format is selected during the WebSocket handshake in onConnect() + and determines how all subsequent messages are encoded and decoded. + + Attributes: + wire_format (WireFormat): The selected wire format codec for this + connection. Set during onConnect(). + """ # nonce value, used to prevent the webclient from erasing the @@ -58,6 +119,56 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): super().__init__(*args, **kwargs) self.protocol_key = "webclient/websocket" self.browserstr = "" + self.wire_format = None + + def onConnect(self, request): + """ + Called during the WebSocket opening handshake, before onOpen(). + + This is where we negotiate the WebSocket subprotocol. The client + sends a list of subprotocols it supports via Sec-WebSocket-Protocol. + We select the best match from our supported list. + + Args: + request (ConnectionRequest): The WebSocket connection request, + containing request.protocols (list of offered subprotocols). + + Returns: + str or None: The selected subprotocol name to echo back in the + Sec-WebSocket-Protocol response header, or None if no + subprotocol was negotiated (legacy client with no header, + or client offered protocols that don't match). + + """ + wire_formats = _get_wire_formats() + supported = _get_supported_subprotocols() + + if request.protocols: + # Client offered subprotocols — pick the first one we support + # (order follows the server's preference from settings) + for proto_name in supported: + if proto_name in request.protocols and proto_name in wire_formats: + self.wire_format = wire_formats[proto_name] + return proto_name + + # Client offered protocols but none matched. Per RFC 6455, if we + # don't echo a subprotocol, a well-behaved client should close the + # connection. We still set a wire format so the connection doesn't + # crash if the client proceeds anyway. + if "v1.evennia.com" in wire_formats: + self.wire_format = wire_formats["v1.evennia.com"] + elif wire_formats: + self.wire_format = next(iter(wire_formats.values())) + return None + + # No Sec-WebSocket-Protocol header at all — legacy client. + # Always use v1 format regardless of WEBSOCKET_SUBPROTOCOLS. + if "v1.evennia.com" in wire_formats: + self.wire_format = wire_formats["v1.evennia.com"] + elif wire_formats: + self.wire_format = next(iter(wire_formats.values())) + + return None def get_client_session(self): """ @@ -131,10 +242,31 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): self.sessid = old_session.sessid self.sessionhandler.disconnect(old_session) + # Ensure wire_format is set (it should be from onConnect, but + # in testing scenarios onConnect may not have been called) + if self.wire_format is None: + wire_formats = _get_wire_formats() + self.wire_format = wire_formats.get( + "v1.evennia.com", next(iter(wire_formats.values()), None) + ) + + if self.wire_format is None: + from evennia.utils import logger + + logger.log_err( + "WebSocketClient: No wire formats available. " + "Closing connection." + ) + self.sendClose(CLOSE_NORMAL, "No wire formats available") + return + browserstr = f":{self.browserstr}" if self.browserstr else "" - self.protocol_flags["CLIENTNAME"] = f"Evennia Webclient (websocket{browserstr})" + proto_name = self.wire_format.name + self.protocol_flags["CLIENTNAME"] = ( + f"Evennia Webclient (websocket{browserstr} [{proto_name}])" + ) self.protocol_flags["UTF-8"] = True - self.protocol_flags["OOB"] = True + self.protocol_flags["OOB"] = self.wire_format.supports_oob self.protocol_flags["TRUECOLOR"] = True self.protocol_flags["XTERM256"] = True self.protocol_flags["ANSI"] = True @@ -196,15 +328,29 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): """ Callback fired when a complete WebSocket message was received. + Delegates to the active wire format's decode_incoming() method + to parse the message into kwargs for data_in(). + Args: payload (bytes): The WebSocket message received. isBinary (bool): Flag indicating whether payload is binary or UTF-8 encoded text. """ - cmdarray = json.loads(str(payload, "utf-8")) - if cmdarray: - self.data_in(**{cmdarray[0]: [cmdarray[1], cmdarray[2]]}) + if self.wire_format: + kwargs = self.wire_format.decode_incoming( + payload, isBinary, protocol_flags=self.protocol_flags + ) + if kwargs: + self.data_in(**kwargs) + else: + # Fallback: try legacy JSON parsing + try: + cmdarray = json.loads(str(payload, "utf-8")) + if cmdarray: + self.data_in(**{cmdarray[0]: [cmdarray[1], cmdarray[2]]}) + except (json.JSONDecodeError, UnicodeDecodeError, IndexError): + pass def sendLine(self, line): """ @@ -221,6 +367,24 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): # it means this link is actually already closed. self.disconnect(reason="Browser already closed.") + def sendEncoded(self, data, is_binary=False): + """ + Send pre-encoded data to the client. + + This is used by wire formats that return raw bytes with a + binary/text frame indicator. + + Args: + data (bytes): The encoded data to send. + is_binary (bool): If True, send as a BINARY frame. + If False, send as a TEXT frame. + + """ + try: + return self.sendMessage(data, isBinary=is_binary) + except Disconnected: + self.disconnect(reason="Browser already closed.") + def at_login(self): csession = self.get_client_session() if csession: @@ -256,20 +420,47 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): def send_text(self, *args, **kwargs): """ - Send text data. This will pre-process the text for - color-replacement, conversion to html etc. + Send text data. Delegates to the active wire format's encode_text() + method, which handles ANSI processing and framing. The exact output + depends on the negotiated subprotocol (e.g., HTML for v1.evennia.com, + raw ANSI for MUD Standards formats). Args: text (str): Text to send. Keyword Args: options (dict): Options-dict with the following keys understood: - - raw (bool): No parsing at all (leave ansi-to-html markers unparsed). + - raw (bool): No parsing at all (leave ansi markers unparsed). - nocolor (bool): Clean out all color. - screenreader (bool): Use Screenreader mode. - - send_prompt (bool): Send a prompt with parsed html + - send_prompt (bool): Send as a prompt instead of regular text. """ + if self.wire_format: + result = self.wire_format.encode_text( + *args, protocol_flags=self.protocol_flags, **kwargs + ) + if result is not None: + data, is_binary = result + self.sendEncoded(data, is_binary=is_binary) + else: + # Fallback: legacy behavior + self._send_text_legacy(*args, **kwargs) + + def _send_text_legacy(self, *args, **kwargs): + """ + Legacy send_text fallback for when no wire format is set. + + Performs the original Evennia HTML conversion (parse_html) and + sends a JSON array ``["text", [html_string], {}]`` via sendLine. + + """ + import html as html_lib + import re + + from evennia.utils.ansi import parse_ansi + from evennia.utils.text2html import parse_html + if args: args = list(args) text = args[0] @@ -277,35 +468,54 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): return else: return - flags = self.protocol_flags - options = kwargs.pop("options", {}) raw = options.get("raw", flags.get("RAW", False)) client_raw = options.get("client_raw", False) nocolor = options.get("nocolor", flags.get("NOCOLOR", False)) screenreader = options.get("screenreader", flags.get("SCREENREADER", False)) prompt = options.get("send_prompt", False) - + _RE = re.compile( + r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE + ) if screenreader: - # screenreader mode cleans up output text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False) - text = _RE_SCREENREADER_REGEX.sub("", text) + text = _RE.sub("", text) cmd = "prompt" if prompt else "text" if raw: if client_raw: args[0] = text else: - args[0] = html.escape(text) # escape html! + args[0] = html_lib.escape(text) else: args[0] = parse_html(text, strip_ansi=nocolor) - - # send to client on required form [cmdname, args, kwargs] self.sendLine(json.dumps([cmd, args, kwargs])) def send_prompt(self, *args, **kwargs): - kwargs["options"].update({"send_prompt": True}) - self.send_text(*args, **kwargs) + """ + Send a prompt to the client. + + Prompts are handled separately from regular text because some + wire formats (e.g. json.mudstandards.org) send prompts as a + distinct message type that the client can render differently. + + Args: + *args: Prompt text as first arg. + + Keyword Args: + options (dict): Same options as send_text. + + """ + if self.wire_format: + result = self.wire_format.encode_prompt( + *args, protocol_flags=self.protocol_flags, **kwargs + ) + if result is not None: + data, is_binary = result + self.sendEncoded(data, is_binary=is_binary) + else: + kwargs.setdefault("options", {}).update({"send_prompt": True}) + self.send_text(*args, **kwargs) def send_default(self, cmdname, *args, **kwargs): """ @@ -321,5 +531,14 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS): client instead. """ - if not cmdname == "options": - self.sendLine(json.dumps([cmdname, args, kwargs])) + if self.wire_format: + result = self.wire_format.encode_default( + cmdname, *args, protocol_flags=self.protocol_flags, **kwargs + ) + if result is not None: + data, is_binary = result + self.sendEncoded(data, is_binary=is_binary) + else: + # Fallback: legacy behavior + if not cmdname == "options": + self.sendLine(json.dumps([cmdname, args, kwargs])) diff --git a/evennia/server/portal/wire_formats/__init__.py b/evennia/server/portal/wire_formats/__init__.py new file mode 100644 index 0000000000..415ddccd23 --- /dev/null +++ b/evennia/server/portal/wire_formats/__init__.py @@ -0,0 +1,44 @@ +""" +Wire format codecs for WebSocket subprotocol support. + +This package implements the strategy pattern for WebSocket wire formats, +allowing Evennia to support multiple WebSocket subprotocols as defined by +the MUD Standards WebSocket proposal (https://mudstandards.org/websocket/). + +Each wire format is a self-contained codec that handles encoding outgoing +data and decoding incoming data for a specific WebSocket subprotocol. + +Supported subprotocols: + - v1.evennia.com: Evennia's legacy JSON array format + - json.mudstandards.org: MUD Standards JSON envelope format + - gmcp.mudstandards.org: GMCP over WebSocket + - terminal.mudstandards.org: Raw ANSI terminal over WebSocket +""" + +from .base import WireFormat +from .evennia_v1 import EvenniaV1Format +from .gmcp_standard import GmcpStandardFormat +from .json_standard import JsonStandardFormat +from .terminal import TerminalFormat + +# Registry of all available wire formats, keyed by subprotocol name. +# Order matters: the first format matching a client's offered subprotocols +# will be selected during negotiation. +WIRE_FORMATS = { + fmt.name: fmt + for fmt in [ + JsonStandardFormat(), + GmcpStandardFormat(), + TerminalFormat(), + EvenniaV1Format(), + ] +} + +__all__ = [ + "WireFormat", + "EvenniaV1Format", + "JsonStandardFormat", + "GmcpStandardFormat", + "TerminalFormat", + "WIRE_FORMATS", +] diff --git a/evennia/server/portal/wire_formats/base.py b/evennia/server/portal/wire_formats/base.py new file mode 100644 index 0000000000..744165c095 --- /dev/null +++ b/evennia/server/portal/wire_formats/base.py @@ -0,0 +1,194 @@ +""" +Base wire format interface for WebSocket subprotocol codecs. + +All wire format implementations must subclass WireFormat and implement +the encoding/decoding methods. Each format represents a specific +WebSocket subprotocol as defined by RFC 6455 Sec-WebSocket-Protocol +negotiation. +""" + +import re + +from django.conf import settings + +from evennia.utils.ansi import parse_ansi + +_RE_SCREENREADER_REGEX = re.compile( + r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE +) + + +class WireFormat: + """ + Abstract base class for WebSocket wire format codecs. + + A wire format handles the translation between Evennia's internal + message representation and the bytes sent over the WebSocket connection. + + Each subclass corresponds to a specific WebSocket subprotocol name + (e.g., "v1.evennia.com", "json.mudstandards.org"). + + Attributes: + name (str): The subprotocol identifier string, used in + Sec-WebSocket-Protocol negotiation. + supports_oob (bool): Whether this format supports out-of-band + data (structured commands beyond plain text). + + """ + + name = None + supports_oob = True + + @staticmethod + def _extract_text_and_flags(args, kwargs, protocol_flags): + """ + Extract text string and display flags from encode arguments. + + This is a shared helper for encode_text/encode_prompt in formats + that use raw ANSI output (terminal, json, gmcp). The EvenniaV1 + format has its own logic (HTML conversion, raw mode) and does + not use this helper. + + Args: + args (tuple): Positional args passed to encode_text/encode_prompt. + args[0] should be the text string. + kwargs (dict): Keyword args. The "options" key is popped and + inspected for "nocolor" and "screenreader" overrides. + protocol_flags (dict or None): Session protocol flags. + + Returns: + tuple or None: (text, nocolor, screenreader) if text is valid, + or None if there is no text to encode. + + """ + if args: + text = args[0] + if text is None: + return None + else: + return None + + flags = protocol_flags or {} + options = kwargs.pop("options", {}) + nocolor = options.get("nocolor", flags.get("NOCOLOR", False)) + screenreader = options.get("screenreader", flags.get("SCREENREADER", False)) + return (text, nocolor, screenreader) + + @staticmethod + def _process_ansi(text, nocolor, screenreader): + """ + Process Evennia ANSI markup into terminal escape sequences. + + Applies screenreader stripping, nocolor stripping, or full ANSI + conversion depending on the flags. This is the shared logic for + all non-HTML wire formats (terminal, json, gmcp). + + Args: + text (str): Text with Evennia ANSI markup (|r, |n, etc.). + nocolor (bool): If True, strip all ANSI codes. + screenreader (bool): If True, strip ANSI and apply + SCREENREADER_REGEX_STRIP. + + Returns: + str: Processed text with real ANSI escape sequences or + stripped text. + + """ + if screenreader: + text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False) + text = _RE_SCREENREADER_REGEX.sub("", text) + elif nocolor: + text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False) + else: + text = parse_ansi(text, xterm256=True, mxp=False) + return text + + def decode_incoming(self, payload, is_binary, protocol_flags=None): + """ + Decode an incoming WebSocket message into kwargs for data_in(). + + Args: + payload (bytes): Raw WebSocket message payload. + is_binary (bool): True if this was a BINARY frame (opcode 2), + False if it was a TEXT frame (opcode 1). + protocol_flags (dict, optional): The session's protocol flags, + which may affect decoding behavior. + + Returns: + dict or None: A dict of kwargs to pass to session.data_in(), + where each key is an inputfunc name and value is [args, kwargs]. + Returns None if the message should be silently ignored. + + """ + raise NotImplementedError( + f"{self.__class__.__name__} must implement decode_incoming()" + ) + + def encode_text(self, *args, protocol_flags=None, **kwargs): + """ + Encode text output for sending to the client. + + This handles the "text" outputfunc — the primary game output. + + Args: + *args: Text arguments. args[0] is typically the text string. + protocol_flags (dict, optional): Session protocol flags that + may affect encoding (e.g., NOCOLOR, SCREENREADER, RAW). + **kwargs: Additional keyword arguments. May include an + "options" dict with keys like "raw", "nocolor", + "screenreader", "send_prompt". + + Returns: + tuple or None: A (data_bytes, is_binary) tuple for sendMessage(), + or None if nothing should be sent. + + """ + raise NotImplementedError( + f"{self.__class__.__name__} must implement encode_text()" + ) + + def encode_prompt(self, *args, protocol_flags=None, **kwargs): + """ + Encode a prompt for sending to the client. + + Default implementation delegates to encode_text with the + send_prompt option set. + + Args: + *args: Prompt arguments. + protocol_flags (dict, optional): Session protocol flags. + **kwargs: Additional keyword arguments. May include an "options" + dict; if absent, one is created with "send_prompt" set to True. + + Returns: + tuple or None: A (data_bytes, is_binary) tuple for sendMessage(), + or None if nothing should be sent. + + """ + options = kwargs.get("options", {}) + options["send_prompt"] = True + kwargs["options"] = options + return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs) + + def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs): + """ + Encode a non-text OOB command for sending to the client. + + This handles all outputfuncs that don't have a specific send_* + method, including custom OOB commands. + + Args: + cmdname (str): The OOB command name. + *args: Command arguments. + protocol_flags (dict, optional): Session protocol flags. + **kwargs: Additional keyword arguments. + + Returns: + tuple or None: A (data_bytes, is_binary) tuple for sendMessage(), + or None if nothing should be sent (e.g., if the format + doesn't support OOB). + + """ + raise NotImplementedError( + f"{self.__class__.__name__} must implement encode_default()" + ) diff --git a/evennia/server/portal/wire_formats/evennia_v1.py b/evennia/server/portal/wire_formats/evennia_v1.py new file mode 100644 index 0000000000..a2c4a06841 --- /dev/null +++ b/evennia/server/portal/wire_formats/evennia_v1.py @@ -0,0 +1,131 @@ +""" +Evennia V1 wire format (v1.evennia.com). + +This is Evennia's legacy WebSocket wire format. All messages are UTF-8 +JSON text frames in the form: + + ["cmdname", [args], {kwargs}] + +Text output is HTML-converted from ANSI before sending. This format +is used by Evennia's built-in webclient and is the default when no +WebSocket subprotocol is negotiated. +""" + +import html +import json + +from evennia.utils.ansi import parse_ansi +from evennia.utils.text2html import parse_html + +from .base import WireFormat, _RE_SCREENREADER_REGEX + + +class EvenniaV1Format(WireFormat): + """ + Evennia's legacy wire format: JSON arrays over TEXT frames. + + Wire format: + All frames are TEXT (UTF-8 JSON). + Structure: ["cmdname", [args], {kwargs}] + + Text handling: + Outgoing text is converted from ANSI to HTML via parse_html(). + + OOB: + All commands are effectively OOB — the cmdname field can be + any string, not just "text". + """ + + name = "v1.evennia.com" + supports_oob = True + + def decode_incoming(self, payload, is_binary, protocol_flags=None): + """ + Decode incoming JSON array message. + + Args: + payload (bytes): UTF-8 encoded JSON: ["cmdname", [args], {kwargs}] + is_binary (bool): Should be False for this format. + protocol_flags (dict, optional): Not used by this format. + + Returns: + dict or None: kwargs for data_in(), e.g. {"text": [["look"], {}]} + + """ + try: + cmdarray = json.loads(str(payload, "utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError): + return None + if cmdarray and len(cmdarray) >= 3: + return {cmdarray[0]: [cmdarray[1], cmdarray[2]]} + return None + + def encode_text(self, *args, protocol_flags=None, **kwargs): + """ + Encode text output as HTML-converted JSON. + + Converts ANSI color codes to HTML spans, applies screenreader + and raw text options. + + Returns: + tuple or None: (json_bytes, False) where False means TEXT frame. + + """ + if args: + args = list(args) + text = args[0] + if text is None: + return None + else: + return None + + flags = protocol_flags or {} + options = kwargs.pop("options", {}) + raw = options.get("raw", flags.get("RAW", False)) + client_raw = options.get("client_raw", False) + nocolor = options.get("nocolor", flags.get("NOCOLOR", False)) + screenreader = options.get("screenreader", flags.get("SCREENREADER", False)) + prompt = options.get("send_prompt", False) + + if screenreader: + text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False) + text = _RE_SCREENREADER_REGEX.sub("", text) + + cmd = "prompt" if prompt else "text" + if raw: + if client_raw: + args[0] = text + else: + args[0] = html.escape(text) + else: + args[0] = parse_html(text, strip_ansi=nocolor) + + return (json.dumps([cmd, args, kwargs]).encode("utf-8"), False) + + def encode_prompt(self, *args, protocol_flags=None, **kwargs): + """ + Encode a prompt as HTML-converted JSON with send_prompt flag. + + Returns: + tuple or None: (json_bytes, False) for TEXT frame. + + """ + options = kwargs.get("options", {}) + options["send_prompt"] = True + kwargs["options"] = options + return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs) + + def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs): + """ + Encode any OOB command as a JSON array. + + Skips the "options" command (legacy behavior). + + Returns: + tuple or None: (json_bytes, False) for TEXT frame, or None + if cmdname is "options". + + """ + if cmdname == "options": + return None + return (json.dumps([cmdname, args, kwargs]).encode("utf-8"), False) diff --git a/evennia/server/portal/wire_formats/gmcp_standard.py b/evennia/server/portal/wire_formats/gmcp_standard.py new file mode 100644 index 0000000000..414f28cbdb --- /dev/null +++ b/evennia/server/portal/wire_formats/gmcp_standard.py @@ -0,0 +1,126 @@ +""" +GMCP MUD Standards wire format (gmcp.mudstandards.org). + +This implements the GMCP subprotocol from the MUD Standards WebSocket +proposal (https://mudstandards.org/websocket/). + +Per the standard: + - BINARY frames contain regular ANSI in- and output (UTF-8 encoded) + - TEXT frames contain UTF-8 encoded GMCP commands in the standard + format: "Package.Name json_payload" + +This is a good match for MUD clients that natively speak GMCP, such as +Mudlet, as it maps directly to their existing GMCP handling without +the extra JSON envelope layer. +""" + +from evennia.server.portal.gmcp_utils import decode_gmcp, encode_gmcp + +from .base import WireFormat + + +class GmcpStandardFormat(WireFormat): + """ + GMCP-native wire format over WebSocket. + + Wire format: + BINARY frames: Raw ANSI text (UTF-8), used for game text I/O. + TEXT frames: GMCP commands in standard format + "Package.Name json_payload" + + Text handling: + Outgoing text retains ANSI escape codes (no HTML conversion). + Text is sent as BINARY frames. + + OOB: + Supported via TEXT frames carrying GMCP messages. The GMCP + format is: "Package.Name optional_json_payload" + """ + + name = "gmcp.mudstandards.org" + supports_oob = True + + def decode_incoming(self, payload, is_binary, protocol_flags=None): + """ + Decode incoming WebSocket message. + + BINARY frames are raw text input. + TEXT frames are GMCP messages. + + Args: + payload (bytes): The raw frame payload. + is_binary (bool): True for BINARY frames, False for TEXT. + protocol_flags (dict, optional): Not used. + + Returns: + dict or None: kwargs for data_in(). + + """ + if is_binary: + # BINARY frame = raw text input + try: + text = payload.decode("utf-8").strip() + except UnicodeDecodeError: + return None + if not text: + return None + return {"text": [[text], {}]} + else: + # TEXT frame = GMCP command + try: + gmcp_data = payload.decode("utf-8") + except UnicodeDecodeError: + return None + return decode_gmcp(gmcp_data) + + def encode_text(self, *args, protocol_flags=None, **kwargs): + """ + Encode text output as raw ANSI in a BINARY frame. + + Returns: + tuple or None: (ansi_bytes, True) where True means BINARY frame. + + """ + extracted = self._extract_text_and_flags(args, kwargs, protocol_flags) + if extracted is None: + return None + text, nocolor, screenreader = extracted + text = self._process_ansi(text, nocolor, screenreader) + return (text.encode("utf-8"), True) + + def encode_prompt(self, *args, protocol_flags=None, **kwargs): + """ + Encode a prompt. + + For GMCP format, prompts are sent as BINARY frames (raw ANSI) + just like regular text — the client can detect prompts via + GMCP if needed. + + Returns: + tuple or None: (ansi_bytes, True) for BINARY frame. + + """ + return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs) + + def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs): + """ + Encode an OOB command as a GMCP message in a TEXT frame. + + Args: + cmdname (str): The OOB command name. + *args: Command arguments. + protocol_flags (dict, optional): Not used. + **kwargs: Command keyword arguments. + + Returns: + tuple or None: (gmcp_bytes, False) for TEXT frame, or None + if cmdname is "options". + + """ + if cmdname == "options": + return None + + kwargs.pop("options", None) + + gmcp_string = encode_gmcp(cmdname, *args, **kwargs) + return (gmcp_string.encode("utf-8"), False) diff --git a/evennia/server/portal/wire_formats/json_standard.py b/evennia/server/portal/wire_formats/json_standard.py new file mode 100644 index 0000000000..e474532da6 --- /dev/null +++ b/evennia/server/portal/wire_formats/json_standard.py @@ -0,0 +1,221 @@ +""" +JSON MUD Standards wire format (json.mudstandards.org). + +This implements the JSON subprotocol from the MUD Standards WebSocket +proposal (https://mudstandards.org/websocket/). + +Per the standard: + - BINARY frames contain regular ANSI in- and output (UTF-8 encoded) + - TEXT frames contain JSON payloads with the structure: + {"proto": "", "id": "", "data": ""} + +This is the most flexible standard format, supporting GMCP, custom +protocols, and any future structured data through the JSON envelope. +""" + +import json + +from evennia.server.portal.gmcp_utils import decode_gmcp, encode_gmcp + +from .base import WireFormat + + +class JsonStandardFormat(WireFormat): + """ + MUD Standards JSON envelope wire format. + + Wire format: + BINARY frames: Raw ANSI text (UTF-8), used for game text I/O. + TEXT frames: JSON envelope {"proto", "id", "data"} for + structured/OOB data. + + Text handling: + Outgoing text retains ANSI escape codes (no HTML conversion). + Text is sent as BINARY frames. + + OOB: + Supported via TEXT frames. The "proto" field identifies the + protocol (e.g., "gmcp"), "id" identifies the command, and + "data" carries the JSON payload. + """ + + name = "json.mudstandards.org" + supports_oob = True + + def decode_incoming(self, payload, is_binary, protocol_flags=None): + """ + Decode incoming WebSocket message. + + BINARY frames are treated as raw text input. + TEXT frames are parsed as JSON envelopes. + + Args: + payload (bytes): The raw frame payload. + is_binary (bool): True for BINARY frames, False for TEXT. + protocol_flags (dict, optional): Not used. + + Returns: + dict or None: kwargs for data_in(). + + """ + if is_binary: + # BINARY frame = raw text input + try: + text = payload.decode("utf-8").strip() + except UnicodeDecodeError: + return None + if not text: + return None + return {"text": [[text], {}]} + else: + # TEXT frame = JSON envelope + try: + envelope = json.loads(payload.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError): + return None + + proto = envelope.get("proto", "") + cmd_id = envelope.get("id", "") + data = envelope.get("data", "") + + # Validate envelope field types — malformed envelopes are dropped + if not isinstance(proto, str): + return None + if not isinstance(cmd_id, str): + cmd_id = str(cmd_id) if cmd_id is not None else "" + if not isinstance(data, str): + try: + data = json.dumps(data) + except (TypeError, ValueError): + data = str(data) + + return self._decode_envelope(proto, cmd_id, data) + + def _decode_envelope(self, proto, cmd_id, data): + """ + Decode a JSON envelope into Evennia inputfunc kwargs. + + Args: + proto (str): The protocol identifier (e.g., "gmcp", "text"). + cmd_id (str): The command identifier (e.g., GMCP package name). + data (str): The payload string. + + Returns: + dict or None: kwargs for data_in(). + + """ + if proto == "gmcp": + # GMCP: id is the package name, data is the JSON payload + cmd_id = cmd_id.strip() + if not cmd_id: + return None + gmcp_string = "%s %s" % (cmd_id, data) if data else cmd_id + return decode_gmcp(gmcp_string) + + elif proto == "text": + # Text input sent via JSON envelope + return {"text": [[data], {}]} + + elif proto == "websocket_close": + return {"websocket_close": [[], {}]} + + else: + # Generic protocol — pass through as-is. + # Prefer cmd_id as the inputfunc name, fall back to proto. + try: + parsed_data = json.loads(data) if data else {} + except (json.JSONDecodeError, ValueError): + parsed_data = data + + args = [] + kwargs = {} + if isinstance(parsed_data, dict): + kwargs = parsed_data + elif isinstance(parsed_data, list): + args = parsed_data + else: + args = [parsed_data] if parsed_data else [] + + funcname = cmd_id if cmd_id else proto + if not funcname: + return None + return {funcname: [args, kwargs]} + + def encode_text(self, *args, protocol_flags=None, **kwargs): + """ + Encode text output as raw ANSI in a BINARY frame. + + Returns: + tuple or None: (ansi_bytes, True) where True means BINARY frame. + + """ + extracted = self._extract_text_and_flags(args, kwargs, protocol_flags) + if extracted is None: + return None + text, nocolor, screenreader = extracted + text = self._process_ansi(text, nocolor, screenreader) + return (text.encode("utf-8"), True) + + def encode_prompt(self, *args, protocol_flags=None, **kwargs): + """ + Encode a prompt. + + For the JSON standard format, prompts are sent as a JSON envelope + in a TEXT frame with proto="prompt", allowing the client to + distinguish prompts from regular text. + + Returns: + tuple or None: (json_bytes, False) for TEXT frame. + + """ + extracted = self._extract_text_and_flags(args, kwargs, protocol_flags) + if extracted is None: + return None + text, nocolor, screenreader = extracted + text = self._process_ansi(text, nocolor, screenreader) + + envelope = { + "proto": "prompt", + "id": "", + "data": text, + } + return (json.dumps(envelope).encode("utf-8"), False) + + def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs): + """ + Encode an OOB command as a GMCP-in-JSON envelope. + + OOB commands are sent as TEXT frames with the JSON envelope format. + The command is translated to GMCP naming conventions and wrapped + in a {"proto": "gmcp", "id": "Package.Name", "data": "..."} envelope. + + Args: + cmdname (str): The OOB command name. + *args: Command arguments. + protocol_flags (dict, optional): Not used. + **kwargs: Command keyword arguments. + + Returns: + tuple or None: (json_bytes, False) for TEXT frame, or None + if cmdname is "options". + + """ + if cmdname == "options": + return None + + kwargs.pop("options", None) + + # Encode as GMCP string first, then wrap in JSON envelope + gmcp_string = encode_gmcp(cmdname, *args, **kwargs) + + # Split the GMCP string into package name and payload + parts = gmcp_string.split(None, 1) + gmcp_package = parts[0] + gmcp_data = parts[1] if len(parts) > 1 else "" + + envelope = { + "proto": "gmcp", + "id": gmcp_package, + "data": gmcp_data, + } + return (json.dumps(envelope).encode("utf-8"), False) diff --git a/evennia/server/portal/wire_formats/terminal.py b/evennia/server/portal/wire_formats/terminal.py new file mode 100644 index 0000000000..44ce59a6e6 --- /dev/null +++ b/evennia/server/portal/wire_formats/terminal.py @@ -0,0 +1,103 @@ +""" +Terminal wire format (terminal.mudstandards.org). + +This implements the simplest MUD Standards WebSocket subprotocol: +raw ANSI/UTF-8 text in BINARY frames. No OOB support. + +Per the MUD Standards proposal: + "BINARY frames contain input/output and ANSI control codes. + Encoded as UTF-8" + +This format is suitable for basic terminal-style MUD clients that +want raw ANSI output without any structured data channel. +""" + +from .base import WireFormat + + +class TerminalFormat(WireFormat): + """ + Raw ANSI terminal wire format over BINARY WebSocket frames. + + Wire format: + All frames are BINARY, containing UTF-8 ANSI text. + No TEXT frames are used. + + Text handling: + Outgoing text retains ANSI escape codes (no HTML conversion). + ANSI is rendered by the client. + + OOB: + Not supported. This format has no structured data channel. + """ + + name = "terminal.mudstandards.org" + supports_oob = False + + def decode_incoming(self, payload, is_binary, protocol_flags=None): + """ + Decode incoming WebSocket frame as raw text input. + + Both BINARY and TEXT frames are treated identically as UTF-8 text. + + Args: + payload (bytes): Raw UTF-8 text from the client. + is_binary (bool): True for BINARY frames, False for TEXT. + Both are handled identically. + protocol_flags (dict, optional): Not used. + + Returns: + dict or None: {"text": [[text_string], {}]} + + """ + try: + text = payload.decode("utf-8") + except UnicodeDecodeError: + return None + + text = text.strip() + if not text: + return None + + return {"text": [[text], {}]} + + def encode_text(self, *args, protocol_flags=None, **kwargs): + """ + Encode text output as raw ANSI in a BINARY frame. + + No HTML conversion is performed. ANSI color codes are preserved + for the client to render. + + Returns: + tuple or None: (ansi_bytes, True) where True means BINARY frame. + + """ + extracted = self._extract_text_and_flags(args, kwargs, protocol_flags) + if extracted is None: + return None + text, nocolor, screenreader = extracted + text = self._process_ansi(text, nocolor, screenreader) + return (text.encode("utf-8"), True) + + def encode_prompt(self, *args, protocol_flags=None, **kwargs): + """ + Encode a prompt as raw ANSI. + + For terminal mode, prompts are just text — there's no way + to distinguish them from regular output at the wire level. + + Returns: + tuple or None: (ansi_bytes, True) for BINARY frame. + + """ + return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs) + + def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs): + """ + OOB commands are not supported in terminal mode. + + Returns: + None: Always returns None (OOB data is silently dropped). + + """ + return None diff --git a/evennia/settings_default.py b/evennia/settings_default.py index 0371b4bd07..5a025dde94 100644 --- a/evennia/settings_default.py +++ b/evennia/settings_default.py @@ -121,6 +121,26 @@ WEBSOCKET_CLIENT_INTERFACE = "0.0.0.0" # the client will itself figure out this url based on the server's hostname. # e.g. ws://external.example.com or wss://external.example.com:443 WEBSOCKET_CLIENT_URL = None +# Ordered list of WebSocket subprotocols the server will accept during +# RFC 6455 Sec-WebSocket-Protocol negotiation with third-party clients. +# The first protocol in this list that matches a client's offered protocols +# will be selected. Removing a subprotocol from this list disables it for +# clients that explicitly negotiate via Sec-WebSocket-Protocol. +# Set to None to accept all built-in formats. Set to [] to disable all +# subprotocol negotiation (only legacy no-header clients will connect). +# Note: Evennia's built-in webclient sends Sec-WebSocket-Protocol: +# v1.evennia.com, so "v1.evennia.com" must remain in this list for the +# built-in webclient to connect successfully. Legacy clients that send +# no Sec-WebSocket-Protocol header always receive v1.evennia.com format +# regardless of this setting. +# See https://mudstandards.org/websocket/ for details on the standard +# subprotocols. +WEBSOCKET_SUBPROTOCOLS = [ + "json.mudstandards.org", + "gmcp.mudstandards.org", + "terminal.mudstandards.org", + "v1.evennia.com", +] # This determine's whether Evennia's custom admin page is used, or if the # standard Django admin is used. EVENNIA_ADMIN = True diff --git a/evennia/web/static/webclient/js/evennia.js b/evennia/web/static/webclient/js/evennia.js index f3dbb5419c..8cae6a8374 100644 --- a/evennia/web/static/webclient/js/evennia.js +++ b/evennia/web/static/webclient/js/evennia.js @@ -228,7 +228,9 @@ An "emitter" object must have a function return; } // Important - we pass csessid tacked on the url - websocket = new WebSocket(wsurl + '?' + csessid + '&' + cuid + '&' + browser); + var wsquery = wsurl + '?' + csessid + '&' + cuid + '&' + browser; + var protocols = ["v1.evennia.com"]; + websocket = new WebSocket(wsquery, protocols); // Handle Websocket open event websocket.onopen = function (event) {