mirror of
https://github.com/evennia/evennia.git
synced 2026-03-16 21:06:30 +01:00
Add WebSocket subprotocol negotiation per MUD Standards proposal
Implement RFC 6455 Sec-WebSocket-Protocol negotiation for Evennia's WebSocket server, following the MUD Standards WebSocket specification (https://mudstandards.org/websocket/). New subprotocols: - json.mudstandards.org: JSON envelope format with structured OOB - gmcp.mudstandards.org: GMCP over WebSocket (TEXT + BINARY frames) - terminal.mudstandards.org: Raw ANSI/UTF-8 in BINARY frames Architecture: Wire format strategy pattern via composition. A single WebSocketClient delegates encoding/decoding to a WireFormat instance selected during onConnect(). Adding a new format means adding a single file to the wire_formats/ package with zero changes to webclient.py. Backward compatibility: Clients that send no Sec-WebSocket-Protocol header (including the built-in Evennia webclient) get v1.evennia.com format automatically. Existing behavior is fully preserved. Additional changes: - Extract shared GMCP encode/decode into gmcp_utils.py (used by both telnet and websocket GMCP paths) - Add WEBSOCKET_SUBPROTOCOLS setting for server-side format control - Built-in JS webclient now sends Sec-WebSocket-Protocol: v1.evennia.com - Comprehensive test suite for all wire formats (~870 lines)
This commit is contained in:
parent
3761a7cb21
commit
b80d1fd4ed
13 changed files with 2249 additions and 129 deletions
145
evennia/server/portal/gmcp_utils.py
Normal file
145
evennia/server/portal/gmcp_utils.py
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
"""
|
||||
Shared GMCP (Generic MUD Communication Protocol) utilities.
|
||||
|
||||
This module provides encoding and decoding functions for GMCP messages,
|
||||
shared between telnet OOB and WebSocket wire format implementations.
|
||||
|
||||
GMCP messages follow the format: "Package.Subpackage json_payload"
|
||||
|
||||
The mapping dictionaries translate between Evennia's internal command
|
||||
names and standard GMCP package names.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from evennia.utils.utils import is_iter
|
||||
|
||||
# Mapping from Evennia internal names to standard GMCP package names.
|
||||
EVENNIA_TO_GMCP = {
|
||||
"client_options": "Core.Supports.Get",
|
||||
"get_inputfuncs": "Core.Commands.Get",
|
||||
"get_value": "Char.Value.Get",
|
||||
"repeat": "Char.Repeat.Update",
|
||||
"monitor": "Char.Monitor.Update",
|
||||
}
|
||||
|
||||
# Reverse mapping from GMCP package names to Evennia internal names.
|
||||
GMCP_TO_EVENNIA = {v: k for k, v in EVENNIA_TO_GMCP.items()}
|
||||
|
||||
|
||||
def encode_gmcp(cmdname, *args, **kwargs):
|
||||
"""
|
||||
Encode an Evennia command into a GMCP message string.
|
||||
|
||||
Args:
|
||||
cmdname (str): Evennia OOB command name.
|
||||
*args: Command arguments.
|
||||
**kwargs: Command keyword arguments.
|
||||
|
||||
Returns:
|
||||
str: A GMCP-formatted string like "Package.Name json_data"
|
||||
|
||||
Notes:
|
||||
GMCP messages are formatted as:
|
||||
[cmdname, [], {}] -> Cmd.Name
|
||||
[cmdname, [arg], {}] -> Cmd.Name arg
|
||||
[cmdname, [args], {}] -> Cmd.Name [args]
|
||||
[cmdname, [], {kwargs}] -> Cmd.Name {kwargs}
|
||||
[cmdname, [arg], {kwargs}] -> Cmd.Name [arg, {kwargs}]
|
||||
[cmdname, [args], {kwargs}] -> Cmd.Name [[args], {kwargs}]
|
||||
|
||||
Note: When there is exactly one positional argument, it is
|
||||
collapsed (encoded directly rather than wrapped in a list).
|
||||
This applies both with and without keyword arguments. This
|
||||
is inherited behavior from the original telnet_oob.py.
|
||||
|
||||
If cmdname has a direct mapping in EVENNIA_TO_GMCP, that
|
||||
mapped name is used. Otherwise, underscores are converted to
|
||||
dots with initial capitalization. Names without underscores
|
||||
are placed in the Core package.
|
||||
|
||||
"""
|
||||
if cmdname in EVENNIA_TO_GMCP:
|
||||
gmcp_cmdname = EVENNIA_TO_GMCP[cmdname]
|
||||
elif "_" in cmdname:
|
||||
gmcp_cmdname = ".".join(
|
||||
word.capitalize() if not word.isupper() else word
|
||||
for word in cmdname.split("_")
|
||||
)
|
||||
else:
|
||||
gmcp_cmdname = "Core.%s" % (
|
||||
cmdname if cmdname.istitle() else cmdname.capitalize()
|
||||
)
|
||||
|
||||
if not (args or kwargs):
|
||||
return gmcp_cmdname
|
||||
elif args:
|
||||
if len(args) == 1:
|
||||
args = args[0]
|
||||
if kwargs:
|
||||
return "%s %s" % (gmcp_cmdname, json.dumps([args, kwargs]))
|
||||
else:
|
||||
return "%s %s" % (gmcp_cmdname, json.dumps(args))
|
||||
else:
|
||||
return "%s %s" % (gmcp_cmdname, json.dumps(kwargs))
|
||||
|
||||
|
||||
def decode_gmcp(data):
|
||||
"""
|
||||
Decode a GMCP message string into Evennia command format.
|
||||
|
||||
Args:
|
||||
data (str or bytes): GMCP data in the form "Module.Submodule.Cmdname structure"
|
||||
|
||||
Returns:
|
||||
dict: A dict suitable for data_in(), e.g. {"cmdname": [[args], {kwargs}]}
|
||||
Returns empty dict if data cannot be parsed.
|
||||
|
||||
Notes:
|
||||
Incoming GMCP is parsed as:
|
||||
Core.Name -> {"name": [[], {}]}
|
||||
Core.Name string -> {"name": [["string"], {}]}
|
||||
Core.Name [arg, arg, ...] -> {"name": [[args], {}]}
|
||||
Core.Name {key:val, ...} -> {"name": [[], {kwargs}]}
|
||||
Core.Name [[args], {kwargs}] -> {"name": [[args], {kwargs}]}
|
||||
|
||||
"""
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode("utf-8", errors="replace")
|
||||
|
||||
if not data:
|
||||
return {}
|
||||
|
||||
has_payload = True
|
||||
try:
|
||||
cmdname, structure = data.split(None, 1)
|
||||
except ValueError:
|
||||
cmdname, structure = data, ""
|
||||
has_payload = False
|
||||
|
||||
# Check if this is a known GMCP package name
|
||||
if cmdname in GMCP_TO_EVENNIA:
|
||||
evennia_cmdname = GMCP_TO_EVENNIA[cmdname]
|
||||
else:
|
||||
# Convert Package.Name to package_name
|
||||
evennia_cmdname = cmdname.replace(".", "_")
|
||||
if evennia_cmdname.lower().startswith("core_"):
|
||||
evennia_cmdname = evennia_cmdname[5:]
|
||||
evennia_cmdname = evennia_cmdname.lower()
|
||||
|
||||
try:
|
||||
structure = json.loads(structure)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
# structure is not JSON — treat as plain string
|
||||
pass
|
||||
|
||||
args, kwargs = [], {}
|
||||
if is_iter(structure):
|
||||
if isinstance(structure, dict):
|
||||
kwargs = {key: value for key, value in structure.items() if key}
|
||||
else:
|
||||
args = list(structure)
|
||||
elif has_payload:
|
||||
args = [structure]
|
||||
|
||||
return {evennia_cmdname: [args, kwargs]}
|
||||
|
|
@ -24,14 +24,16 @@ This implements the following telnet OOB communication protocols:
|
|||
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import weakref
|
||||
|
||||
# General Telnet
|
||||
from twisted.conch.telnet import IAC, SB, SE
|
||||
|
||||
from evennia.utils.utils import is_iter
|
||||
from .gmcp_utils import (
|
||||
decode_gmcp as _decode_gmcp,
|
||||
encode_gmcp as _encode_gmcp_str,
|
||||
)
|
||||
|
||||
# MSDP-relevant telnet cmd/opt-codes
|
||||
MSDP = bytes([69])
|
||||
|
|
@ -59,14 +61,6 @@ msdp_regex_array = re.compile(
|
|||
msdp_regex_var = re.compile(rb"%s" % MSDP_VAR)
|
||||
msdp_regex_val = re.compile(rb"%s" % MSDP_VAL)
|
||||
|
||||
EVENNIA_TO_GMCP = {
|
||||
"client_options": "Core.Supports.Get",
|
||||
"get_inputfuncs": "Core.Commands.Get",
|
||||
"get_value": "Char.Value.Get",
|
||||
"repeat": "Char.Repeat.Update",
|
||||
"monitor": "Char.Monitor.Update",
|
||||
}
|
||||
|
||||
|
||||
# MSDP/GMCP communication handler
|
||||
|
||||
|
|
@ -224,61 +218,16 @@ class TelnetOOB:
|
|||
cmdname (str): GMCP OOB command name.
|
||||
args, kwargs (any): Arguments to OOB command.
|
||||
|
||||
Notes:
|
||||
GMCP messages will be outgoing on the following
|
||||
form (the non-JSON cmdname at the start is what
|
||||
IRE games use, supposedly, and what clients appear
|
||||
to have adopted). A cmdname without Package will end
|
||||
up in the Core package, while Core package names will
|
||||
be stripped on the Evennia side.
|
||||
::
|
||||
|
||||
[cmd_name, [], {}] -> Cmd.Name
|
||||
[cmd_name, [arg], {}] -> Cmd.Name arg
|
||||
[cmd_name, [args],{}] -> Cmd.Name [args]
|
||||
[cmd_name, [], {kwargs}] -> Cmd.Name {kwargs}
|
||||
[cmdname, [args, {kwargs}] -> Core.Cmdname [[args],{kwargs}]
|
||||
|
||||
For more flexibility with certain clients, if `cmd_name` is capitalized,
|
||||
Evennia will leave its current capitalization (So CMD_nAmE would be sent
|
||||
as CMD.nAmE but cMD_Name would be Cmd.Name)
|
||||
Returns:
|
||||
bytes: GMCP-encoded byte string for telnet subnegotiation.
|
||||
|
||||
Notes:
|
||||
There are also a few default mappings between evennia outputcmds and GMCP:
|
||||
::
|
||||
|
||||
client_options -> Core.Supports.Get
|
||||
get_inputfuncs -> Core.Commands.Get
|
||||
get_value -> Char.Value.Get
|
||||
repeat -> Char.Repeat.Update
|
||||
monitor -> Char.Monitor.Update
|
||||
Delegates to the shared ``gmcp_utils.encode_gmcp`` and encodes
|
||||
the resulting string to bytes for telnet transport. See
|
||||
``gmcp_utils.encode_gmcp`` for full format documentation.
|
||||
|
||||
"""
|
||||
|
||||
if cmdname in EVENNIA_TO_GMCP:
|
||||
gmcp_cmdname = EVENNIA_TO_GMCP[cmdname]
|
||||
elif "_" in cmdname:
|
||||
# enforce initial capitalization of each command part, leaving fully-capitalized sections intact
|
||||
gmcp_cmdname = ".".join(
|
||||
word.capitalize() if not word.isupper() else word for word in cmdname.split("_")
|
||||
)
|
||||
else:
|
||||
gmcp_cmdname = "Core.%s" % (cmdname if cmdname.istitle() else cmdname.capitalize())
|
||||
|
||||
if not (args or kwargs):
|
||||
gmcp_string = gmcp_cmdname
|
||||
elif args:
|
||||
if len(args) == 1:
|
||||
args = args[0]
|
||||
if kwargs:
|
||||
gmcp_string = "%s %s" % (gmcp_cmdname, json.dumps([args, kwargs]))
|
||||
else:
|
||||
gmcp_string = "%s %s" % (gmcp_cmdname, json.dumps(args))
|
||||
else: # only kwargs
|
||||
gmcp_string = "%s %s" % (gmcp_cmdname, json.dumps(kwargs))
|
||||
|
||||
# print("gmcp string", gmcp_string) # DEBUG
|
||||
return gmcp_string.encode()
|
||||
return _encode_gmcp_str(cmdname, *args, **kwargs).encode()
|
||||
|
||||
def decode_msdp(self, data):
|
||||
"""
|
||||
|
|
@ -386,46 +335,18 @@ class TelnetOOB:
|
|||
data (str or list): GMCP data.
|
||||
|
||||
Notes:
|
||||
Clients send data on the form "Module.Submodule.Cmdname <structure>".
|
||||
We assume the structure is valid JSON.
|
||||
|
||||
The following is parsed into Evennia's formal structure:
|
||||
::
|
||||
|
||||
Core.Name -> [name, [], {}]
|
||||
Core.Name string -> [name, [string], {}]
|
||||
Core.Name [arg, arg,...] -> [name, [args], {}]
|
||||
Core.Name {key:arg, key:arg, ...} -> [name, [], {kwargs}]
|
||||
Core.Name [[args], {kwargs}] -> [name, [args], {kwargs}]
|
||||
Delegates to ``gmcp_utils.decode_gmcp`` for parsing, then
|
||||
passes the result to ``self.protocol().data_in()``.
|
||||
See ``gmcp_utils.decode_gmcp`` for full format documentation.
|
||||
|
||||
"""
|
||||
if isinstance(data, list):
|
||||
data = b"".join(data)
|
||||
|
||||
# print("decode_gmcp in:", data) # DEBUG
|
||||
if data:
|
||||
try:
|
||||
cmdname, structure = data.split(None, 1)
|
||||
except ValueError:
|
||||
cmdname, structure = data, b""
|
||||
cmdname = cmdname.replace(b".", b"_")
|
||||
try:
|
||||
structure = json.loads(structure)
|
||||
except ValueError:
|
||||
# maybe the structure is not json-serialized at all
|
||||
pass
|
||||
args, kwargs = [], {}
|
||||
if is_iter(structure):
|
||||
if isinstance(structure, dict):
|
||||
kwargs = {key: value for key, value in structure.items() if key}
|
||||
else:
|
||||
args = list(structure)
|
||||
else:
|
||||
args = (structure,)
|
||||
if cmdname.lower().startswith(b"core_"):
|
||||
# if Core.cmdname, then use cmdname
|
||||
cmdname = cmdname[5:]
|
||||
self.protocol().data_in(**{cmdname.lower().decode(): [args, kwargs]})
|
||||
cmds = _decode_gmcp(data)
|
||||
if cmds:
|
||||
self.protocol().data_in(**cmds)
|
||||
|
||||
# access methods
|
||||
|
||||
|
|
|
|||
933
evennia/server/portal/test_wire_formats.py
Normal file
933
evennia/server/portal/test_wire_formats.py
Normal file
|
|
@ -0,0 +1,933 @@
|
|||
"""
|
||||
Tests for WebSocket wire formats and subprotocol negotiation.
|
||||
|
||||
Tests cover:
|
||||
- gmcp_utils.py: encode_gmcp / decode_gmcp
|
||||
- Wire format codecs: EvenniaV1, Terminal, JsonStandard, GmcpStandard
|
||||
- WebSocket subprotocol negotiation in webclient.py
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
try:
|
||||
from django.utils.unittest import TestCase
|
||||
except ImportError:
|
||||
from django.test import TestCase
|
||||
|
||||
from mock import MagicMock, Mock
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GMCP utilities
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGmcpEncode(TestCase):
|
||||
"""Tests for gmcp_utils.encode_gmcp()."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.gmcp_utils import encode_gmcp
|
||||
|
||||
self.encode = encode_gmcp
|
||||
|
||||
def test_known_mapping(self):
|
||||
"""Commands in EVENNIA_TO_GMCP should use the mapped name."""
|
||||
result = self.encode("client_options")
|
||||
self.assertEqual(result, "Core.Supports.Get")
|
||||
|
||||
def test_known_mapping_with_args(self):
|
||||
result = self.encode("get_value", "hp")
|
||||
self.assertEqual(result, 'Char.Value.Get "hp"')
|
||||
|
||||
def test_underscore_to_dotted(self):
|
||||
"""Underscored names should become dotted with capitalization."""
|
||||
result = self.encode("char_vitals")
|
||||
self.assertEqual(result, "Char.Vitals")
|
||||
|
||||
def test_no_underscore_gets_core_prefix(self):
|
||||
"""Single-word commands get Core. prefix."""
|
||||
result = self.encode("ping")
|
||||
self.assertEqual(result, "Core.Ping")
|
||||
|
||||
def test_already_title_case_preserved(self):
|
||||
result = self.encode("Ping")
|
||||
self.assertEqual(result, "Core.Ping")
|
||||
|
||||
def test_fully_uppercase_preserved(self):
|
||||
"""Fully uppercase segments should stay uppercase."""
|
||||
result = self.encode("char_HP")
|
||||
self.assertEqual(result, "Char.HP")
|
||||
|
||||
def test_no_args_no_kwargs(self):
|
||||
result = self.encode("ping")
|
||||
self.assertEqual(result, "Core.Ping")
|
||||
|
||||
def test_single_arg(self):
|
||||
result = self.encode("ping", "test")
|
||||
self.assertEqual(result, 'Core.Ping "test"')
|
||||
|
||||
def test_multiple_args(self):
|
||||
result = self.encode("ping", "a", "b")
|
||||
self.assertEqual(result, 'Core.Ping ["a", "b"]')
|
||||
|
||||
def test_kwargs_only(self):
|
||||
result = self.encode("ping", hp=100)
|
||||
self.assertEqual(result, 'Core.Ping {"hp": 100}')
|
||||
|
||||
def test_args_and_kwargs(self):
|
||||
result = self.encode("ping", "a", hp=100)
|
||||
self.assertEqual(result, 'Core.Ping ["a", {"hp": 100}]')
|
||||
|
||||
|
||||
class TestGmcpDecode(TestCase):
|
||||
"""Tests for gmcp_utils.decode_gmcp()."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.gmcp_utils import decode_gmcp
|
||||
|
||||
self.decode = decode_gmcp
|
||||
|
||||
def test_known_mapping(self):
|
||||
"""Known GMCP package names should map to Evennia names."""
|
||||
result = self.decode("Core.Supports.Get")
|
||||
self.assertIn("client_options", result)
|
||||
self.assertEqual(result["client_options"], [[], {}])
|
||||
|
||||
def test_package_to_underscore(self):
|
||||
"""Unknown package names should become lowercase underscore."""
|
||||
result = self.decode("Char.Vitals")
|
||||
self.assertIn("char_vitals", result)
|
||||
|
||||
def test_core_prefix_stripped(self):
|
||||
"""Core. prefix should be stripped from the command name."""
|
||||
result = self.decode("Core.Ping")
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [[], {}])
|
||||
|
||||
def test_string_arg(self):
|
||||
result = self.decode('Core.Ping "test"')
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [["test"], {}])
|
||||
|
||||
def test_array_arg(self):
|
||||
result = self.decode('Core.Ping ["a", "b"]')
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [["a", "b"], {}])
|
||||
|
||||
def test_dict_arg(self):
|
||||
result = self.decode('Core.Ping {"hp": 100}')
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [[], {"hp": 100}])
|
||||
|
||||
def test_bytes_input(self):
|
||||
result = self.decode(b"Core.Ping")
|
||||
self.assertIn("ping", result)
|
||||
|
||||
def test_empty_input(self):
|
||||
result = self.decode("")
|
||||
self.assertEqual(result, {})
|
||||
|
||||
def test_non_json_structure(self):
|
||||
"""Non-JSON data after command name should be treated as string."""
|
||||
result = self.decode("Core.Ping hello world")
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [["hello world"], {}])
|
||||
|
||||
def test_falsy_scalar_zero(self):
|
||||
"""GMCP payloads of 0 should not be dropped."""
|
||||
result = self.decode("Core.Ping 0")
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [[0], {}])
|
||||
|
||||
def test_falsy_scalar_false(self):
|
||||
"""GMCP payloads of false should not be dropped."""
|
||||
result = self.decode("Core.Ping false")
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [[False], {}])
|
||||
|
||||
def test_null_payload(self):
|
||||
"""GMCP payloads of null should be passed through."""
|
||||
result = self.decode("Core.Ping null")
|
||||
self.assertIn("ping", result)
|
||||
self.assertEqual(result["ping"], [[None], {}])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wire format base
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWireFormatBase(TestCase):
|
||||
"""Tests for the WireFormat abstract base class."""
|
||||
|
||||
def test_abstract_methods_raise(self):
|
||||
from evennia.server.portal.wire_formats.base import WireFormat
|
||||
|
||||
fmt = WireFormat()
|
||||
with self.assertRaises(NotImplementedError):
|
||||
fmt.decode_incoming(b"", False)
|
||||
with self.assertRaises(NotImplementedError):
|
||||
fmt.encode_text("hello")
|
||||
with self.assertRaises(NotImplementedError):
|
||||
fmt.encode_default("cmd")
|
||||
|
||||
def test_encode_prompt_delegates_to_encode_text(self):
|
||||
"""Default encode_prompt should call encode_text with send_prompt=True."""
|
||||
from evennia.server.portal.wire_formats.base import WireFormat
|
||||
|
||||
fmt = WireFormat()
|
||||
# Monkey-patch encode_text to verify delegation
|
||||
fmt.encode_text = MagicMock(return_value=(b"test", False))
|
||||
result = fmt.encode_prompt("hello", options={"raw": False})
|
||||
fmt.encode_text.assert_called_once()
|
||||
call_kwargs = fmt.encode_text.call_args
|
||||
self.assertTrue(call_kwargs.kwargs["options"]["send_prompt"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Evennia V1 wire format
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEvenniaV1Format(TestCase):
|
||||
"""Tests for the v1.evennia.com wire format."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.evennia_v1 import EvenniaV1Format
|
||||
|
||||
self.fmt = EvenniaV1Format()
|
||||
|
||||
def test_name(self):
|
||||
self.assertEqual(self.fmt.name, "v1.evennia.com")
|
||||
|
||||
def test_supports_oob(self):
|
||||
self.assertTrue(self.fmt.supports_oob)
|
||||
|
||||
# --- decode ---
|
||||
|
||||
def test_decode_text(self):
|
||||
payload = json.dumps(["text", ["look"], {}]).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertEqual(result, {"text": [["look"], {}]})
|
||||
|
||||
def test_decode_oob_command(self):
|
||||
payload = json.dumps(["logged_in", [], {}]).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertEqual(result, {"logged_in": [[], {}]})
|
||||
|
||||
def test_decode_invalid_json(self):
|
||||
result = self.fmt.decode_incoming(b"not json", is_binary=False)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_short_array(self):
|
||||
payload = json.dumps(["text"]).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIsNone(result)
|
||||
|
||||
# --- encode text ---
|
||||
|
||||
def test_encode_text_basic(self):
|
||||
result = self.fmt.encode_text("Hello |rworld|n", protocol_flags={})
|
||||
self.assertIsNotNone(result)
|
||||
data, is_binary = result
|
||||
self.assertFalse(is_binary)
|
||||
parsed = json.loads(data)
|
||||
self.assertEqual(parsed[0], "text")
|
||||
# Should contain HTML (from parse_html)
|
||||
self.assertIsInstance(parsed[1][0], str)
|
||||
|
||||
def test_encode_text_none(self):
|
||||
result = self.fmt.encode_text(None, protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_encode_text_no_args(self):
|
||||
result = self.fmt.encode_text(protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_encode_text_prompt(self):
|
||||
result = self.fmt.encode_text(
|
||||
"HP: 100", protocol_flags={}, options={"send_prompt": True}
|
||||
)
|
||||
data, is_binary = result
|
||||
parsed = json.loads(data)
|
||||
self.assertEqual(parsed[0], "prompt")
|
||||
|
||||
def test_encode_text_nocolor(self):
|
||||
result = self.fmt.encode_text(
|
||||
"Hello |rworld|n", protocol_flags={}, options={"nocolor": True}
|
||||
)
|
||||
data, _ = result
|
||||
parsed = json.loads(data)
|
||||
# With nocolor, ANSI should be stripped
|
||||
self.assertNotIn("|r", parsed[1][0])
|
||||
|
||||
# --- encode default (OOB) ---
|
||||
|
||||
def test_encode_default(self):
|
||||
result = self.fmt.encode_default("custom_cmd", "arg1", protocol_flags={}, key="val")
|
||||
data, is_binary = result
|
||||
self.assertFalse(is_binary)
|
||||
parsed = json.loads(data)
|
||||
self.assertEqual(parsed[0], "custom_cmd")
|
||||
|
||||
def test_encode_default_options_skipped(self):
|
||||
"""The 'options' command should be silently dropped."""
|
||||
result = self.fmt.encode_default("options", protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Terminal wire format
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestTerminalFormat(TestCase):
|
||||
"""Tests for the terminal.mudstandards.org wire format."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.terminal import TerminalFormat
|
||||
|
||||
self.fmt = TerminalFormat()
|
||||
|
||||
def test_name(self):
|
||||
self.assertEqual(self.fmt.name, "terminal.mudstandards.org")
|
||||
|
||||
def test_no_oob(self):
|
||||
self.assertFalse(self.fmt.supports_oob)
|
||||
|
||||
# --- decode ---
|
||||
|
||||
def test_decode_binary(self):
|
||||
payload = "look".encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=True)
|
||||
self.assertEqual(result, {"text": [["look"], {}]})
|
||||
|
||||
def test_decode_strips_whitespace(self):
|
||||
payload = " look \n".encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=True)
|
||||
self.assertEqual(result, {"text": [["look"], {}]})
|
||||
|
||||
def test_decode_empty(self):
|
||||
result = self.fmt.decode_incoming(b" ", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_invalid_utf8(self):
|
||||
result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
# --- encode text ---
|
||||
|
||||
def test_encode_text_binary_frame(self):
|
||||
result = self.fmt.encode_text("Hello world", protocol_flags={})
|
||||
self.assertIsNotNone(result)
|
||||
data, is_binary = result
|
||||
self.assertTrue(is_binary)
|
||||
self.assertIsInstance(data, bytes)
|
||||
|
||||
def test_encode_text_preserves_ansi(self):
|
||||
"""Terminal format should output real ANSI escape sequences."""
|
||||
result = self.fmt.encode_text("Hello |rworld|n", protocol_flags={})
|
||||
data, _ = result
|
||||
text = data.decode("utf-8")
|
||||
# parse_ansi converts |r to ESC[31m (or similar)
|
||||
self.assertIn("\033[", text)
|
||||
|
||||
def test_encode_text_none(self):
|
||||
result = self.fmt.encode_text(None, protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
# --- encode default (OOB) ---
|
||||
|
||||
def test_encode_default_returns_none(self):
|
||||
"""OOB should be silently dropped for terminal format."""
|
||||
result = self.fmt.encode_default("custom_cmd", "arg1", protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# JSON MUD Standards wire format
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestJsonStandardFormat(TestCase):
|
||||
"""Tests for the json.mudstandards.org wire format."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.json_standard import JsonStandardFormat
|
||||
|
||||
self.fmt = JsonStandardFormat()
|
||||
|
||||
def test_name(self):
|
||||
self.assertEqual(self.fmt.name, "json.mudstandards.org")
|
||||
|
||||
def test_supports_oob(self):
|
||||
self.assertTrue(self.fmt.supports_oob)
|
||||
|
||||
# --- decode ---
|
||||
|
||||
def test_decode_binary_as_text(self):
|
||||
"""BINARY frames should be treated as raw text input."""
|
||||
payload = "look".encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=True)
|
||||
self.assertEqual(result, {"text": [["look"], {}]})
|
||||
|
||||
def test_decode_binary_empty(self):
|
||||
result = self.fmt.decode_incoming(b" ", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_text_envelope(self):
|
||||
"""TEXT frames should be parsed as JSON envelopes."""
|
||||
envelope = {"proto": "text", "id": "", "data": "look around"}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertEqual(result, {"text": [["look around"], {}]})
|
||||
|
||||
def test_decode_gmcp_envelope(self):
|
||||
"""GMCP-in-JSON envelopes should be decoded."""
|
||||
envelope = {"proto": "gmcp", "id": "Core.Ping", "data": ""}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIn("ping", result)
|
||||
|
||||
def test_decode_gmcp_envelope_with_data(self):
|
||||
envelope = {"proto": "gmcp", "id": "Char.Vitals", "data": '{"hp": 100}'}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIn("char_vitals", result)
|
||||
|
||||
def test_decode_websocket_close(self):
|
||||
envelope = {"proto": "websocket_close", "id": "", "data": ""}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIn("websocket_close", result)
|
||||
|
||||
def test_decode_invalid_json_text_frame(self):
|
||||
result = self.fmt.decode_incoming(b"not json", is_binary=False)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_generic_proto(self):
|
||||
"""Unknown proto should pass through as-is."""
|
||||
envelope = {"proto": "custom", "id": "my_cmd", "data": '{"key": "val"}'}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIn("my_cmd", result)
|
||||
self.assertEqual(result["my_cmd"], [[], {"key": "val"}])
|
||||
|
||||
# --- encode text ---
|
||||
|
||||
def test_encode_text_binary_frame(self):
|
||||
"""Text should be sent as BINARY frames with raw ANSI."""
|
||||
result = self.fmt.encode_text("Hello world", protocol_flags={})
|
||||
data, is_binary = result
|
||||
self.assertTrue(is_binary)
|
||||
self.assertIsInstance(data, bytes)
|
||||
|
||||
def test_encode_text_preserves_ansi(self):
|
||||
result = self.fmt.encode_text("Hello |rworld|n", protocol_flags={})
|
||||
data, _ = result
|
||||
text = data.decode("utf-8")
|
||||
self.assertIn("\033[", text)
|
||||
|
||||
# --- encode prompt ---
|
||||
|
||||
def test_encode_prompt_as_json_text_frame(self):
|
||||
"""Prompts should be JSON envelopes in TEXT frames."""
|
||||
result = self.fmt.encode_prompt("HP: 100>", protocol_flags={})
|
||||
data, is_binary = result
|
||||
self.assertFalse(is_binary)
|
||||
envelope = json.loads(data.decode("utf-8"))
|
||||
self.assertEqual(envelope["proto"], "prompt")
|
||||
|
||||
# --- encode default (OOB) ---
|
||||
|
||||
def test_encode_default_gmcp_in_json(self):
|
||||
"""OOB should be encoded as GMCP-in-JSON envelope."""
|
||||
result = self.fmt.encode_default("ping", protocol_flags={})
|
||||
data, is_binary = result
|
||||
self.assertFalse(is_binary)
|
||||
envelope = json.loads(data.decode("utf-8"))
|
||||
self.assertEqual(envelope["proto"], "gmcp")
|
||||
self.assertEqual(envelope["id"], "Core.Ping")
|
||||
|
||||
def test_encode_default_options_skipped(self):
|
||||
result = self.fmt.encode_default("options", protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GMCP MUD Standards wire format
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGmcpStandardFormat(TestCase):
|
||||
"""Tests for the gmcp.mudstandards.org wire format."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.gmcp_standard import GmcpStandardFormat
|
||||
|
||||
self.fmt = GmcpStandardFormat()
|
||||
|
||||
def test_name(self):
|
||||
self.assertEqual(self.fmt.name, "gmcp.mudstandards.org")
|
||||
|
||||
def test_supports_oob(self):
|
||||
self.assertTrue(self.fmt.supports_oob)
|
||||
|
||||
# --- decode ---
|
||||
|
||||
def test_decode_binary_as_text(self):
|
||||
payload = "look".encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=True)
|
||||
self.assertEqual(result, {"text": [["look"], {}]})
|
||||
|
||||
def test_decode_text_as_gmcp(self):
|
||||
"""TEXT frames should be parsed as raw GMCP strings."""
|
||||
payload = "Core.Ping".encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIn("ping", result)
|
||||
|
||||
def test_decode_gmcp_with_data(self):
|
||||
payload = 'Char.Vitals {"hp": 100}'.encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertIn("char_vitals", result)
|
||||
self.assertEqual(result["char_vitals"], [[], {"hp": 100}])
|
||||
|
||||
def test_decode_binary_invalid_utf8(self):
|
||||
result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_text_invalid_utf8(self):
|
||||
result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=False)
|
||||
self.assertIsNone(result)
|
||||
|
||||
# --- encode text ---
|
||||
|
||||
def test_encode_text_binary_frame(self):
|
||||
result = self.fmt.encode_text("Hello world", protocol_flags={})
|
||||
data, is_binary = result
|
||||
self.assertTrue(is_binary)
|
||||
self.assertIsInstance(data, bytes)
|
||||
|
||||
# --- encode prompt ---
|
||||
|
||||
def test_encode_prompt_binary_frame(self):
|
||||
"""GMCP format sends prompts as BINARY frames like regular text."""
|
||||
result = self.fmt.encode_prompt("HP: 100>", protocol_flags={})
|
||||
data, is_binary = result
|
||||
self.assertTrue(is_binary)
|
||||
|
||||
# --- encode default (OOB) ---
|
||||
|
||||
def test_encode_default_gmcp_text_frame(self):
|
||||
"""OOB should be raw GMCP strings in TEXT frames."""
|
||||
result = self.fmt.encode_default("ping", protocol_flags={})
|
||||
data, is_binary = result
|
||||
self.assertFalse(is_binary)
|
||||
text = data.decode("utf-8")
|
||||
self.assertTrue(text.startswith("Core.Ping"))
|
||||
|
||||
def test_encode_default_with_args(self):
|
||||
result = self.fmt.encode_default("ping", "test", protocol_flags={})
|
||||
data, _ = result
|
||||
text = data.decode("utf-8")
|
||||
self.assertIn("Core.Ping", text)
|
||||
self.assertIn("test", text)
|
||||
|
||||
def test_encode_default_options_skipped(self):
|
||||
result = self.fmt.encode_default("options", protocol_flags={})
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wire format registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWireFormatRegistry(TestCase):
|
||||
"""Tests for the wire_formats package registry."""
|
||||
|
||||
def test_registry_has_all_formats(self):
|
||||
from evennia.server.portal.wire_formats import WIRE_FORMATS
|
||||
|
||||
self.assertIn("v1.evennia.com", WIRE_FORMATS)
|
||||
self.assertIn("json.mudstandards.org", WIRE_FORMATS)
|
||||
self.assertIn("gmcp.mudstandards.org", WIRE_FORMATS)
|
||||
self.assertIn("terminal.mudstandards.org", WIRE_FORMATS)
|
||||
|
||||
def test_registry_order_prefers_json(self):
|
||||
"""json.mudstandards.org should be first (highest priority)."""
|
||||
from evennia.server.portal.wire_formats import WIRE_FORMATS
|
||||
|
||||
keys = list(WIRE_FORMATS.keys())
|
||||
self.assertEqual(keys[0], "json.mudstandards.org")
|
||||
|
||||
def test_registry_instances_are_correct_types(self):
|
||||
from evennia.server.portal.wire_formats import (
|
||||
WIRE_FORMATS,
|
||||
EvenniaV1Format,
|
||||
GmcpStandardFormat,
|
||||
JsonStandardFormat,
|
||||
TerminalFormat,
|
||||
)
|
||||
|
||||
self.assertIsInstance(WIRE_FORMATS["v1.evennia.com"], EvenniaV1Format)
|
||||
self.assertIsInstance(WIRE_FORMATS["json.mudstandards.org"], JsonStandardFormat)
|
||||
self.assertIsInstance(WIRE_FORMATS["gmcp.mudstandards.org"], GmcpStandardFormat)
|
||||
self.assertIsInstance(WIRE_FORMATS["terminal.mudstandards.org"], TerminalFormat)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket subprotocol negotiation (integration tests)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWebSocketSubprotocolNegotiation(TestCase):
|
||||
"""
|
||||
Tests for the onConnect() subprotocol negotiation in WebSocketClient.
|
||||
|
||||
These test the negotiation logic in isolation without starting a full
|
||||
Twisted reactor, by directly calling onConnect() with mock request objects.
|
||||
"""
|
||||
|
||||
def _make_client(self):
|
||||
"""Create a WebSocketClient without connecting it."""
|
||||
from evennia.server.portal.webclient import WebSocketClient
|
||||
|
||||
client = WebSocketClient()
|
||||
return client
|
||||
|
||||
def _make_request(self, protocols=None):
|
||||
"""Create a mock ConnectionRequest with the given protocols list."""
|
||||
request = Mock()
|
||||
request.protocols = protocols or []
|
||||
return request
|
||||
|
||||
def test_no_subprotocol_offered(self):
|
||||
"""Client sends no Sec-WebSocket-Protocol → v1 fallback, returns None."""
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=[])
|
||||
result = client.onConnect(request)
|
||||
self.assertIsNone(result)
|
||||
self.assertIsNotNone(client.wire_format)
|
||||
self.assertEqual(client.wire_format.name, "v1.evennia.com")
|
||||
|
||||
def test_v1_subprotocol_offered(self):
|
||||
"""Client offers v1.evennia.com → selected and returned."""
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=["v1.evennia.com"])
|
||||
result = client.onConnect(request)
|
||||
self.assertEqual(result, "v1.evennia.com")
|
||||
self.assertEqual(client.wire_format.name, "v1.evennia.com")
|
||||
|
||||
def test_json_subprotocol_offered(self):
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=["json.mudstandards.org"])
|
||||
result = client.onConnect(request)
|
||||
self.assertEqual(result, "json.mudstandards.org")
|
||||
self.assertEqual(client.wire_format.name, "json.mudstandards.org")
|
||||
|
||||
def test_gmcp_subprotocol_offered(self):
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=["gmcp.mudstandards.org"])
|
||||
result = client.onConnect(request)
|
||||
self.assertEqual(result, "gmcp.mudstandards.org")
|
||||
self.assertEqual(client.wire_format.name, "gmcp.mudstandards.org")
|
||||
|
||||
def test_terminal_subprotocol_offered(self):
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=["terminal.mudstandards.org"])
|
||||
result = client.onConnect(request)
|
||||
self.assertEqual(result, "terminal.mudstandards.org")
|
||||
self.assertEqual(client.wire_format.name, "terminal.mudstandards.org")
|
||||
|
||||
def test_server_preference_wins(self):
|
||||
"""When client offers multiple, server's preference order wins."""
|
||||
client = self._make_client()
|
||||
# Client offers terminal first, but server prefers json
|
||||
request = self._make_request(
|
||||
protocols=["terminal.mudstandards.org", "json.mudstandards.org"]
|
||||
)
|
||||
result = client.onConnect(request)
|
||||
# Server preference: json > gmcp > terminal > v1
|
||||
self.assertEqual(result, "json.mudstandards.org")
|
||||
|
||||
def test_unknown_subprotocol_falls_back(self):
|
||||
"""Client offers only unknown protocols → v1 fallback."""
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=["unknown.protocol"])
|
||||
result = client.onConnect(request)
|
||||
self.assertIsNone(result)
|
||||
self.assertEqual(client.wire_format.name, "v1.evennia.com")
|
||||
|
||||
def test_mixed_known_and_unknown(self):
|
||||
"""Client offers unknown + known → known is selected."""
|
||||
client = self._make_client()
|
||||
request = self._make_request(
|
||||
protocols=["unknown.protocol", "terminal.mudstandards.org"]
|
||||
)
|
||||
result = client.onConnect(request)
|
||||
self.assertEqual(result, "terminal.mudstandards.org")
|
||||
|
||||
def test_empty_subprotocols_setting(self):
|
||||
"""WEBSOCKET_SUBPROTOCOLS=[] disables negotiation; clients fall back to v1."""
|
||||
from evennia.server.portal import webclient
|
||||
|
||||
original = webclient._get_supported_subprotocols
|
||||
webclient._get_supported_subprotocols = lambda: []
|
||||
try:
|
||||
client = self._make_client()
|
||||
request = self._make_request(protocols=["json.mudstandards.org"])
|
||||
result = client.onConnect(request)
|
||||
# No match possible — falls back to v1, returns None
|
||||
self.assertIsNone(result)
|
||||
self.assertEqual(client.wire_format.name, "v1.evennia.com")
|
||||
finally:
|
||||
webclient._get_supported_subprotocols = original
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: full message round-trip through wire formats
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEvenniaV1RoundTrip(TestCase):
|
||||
"""Test encode → decode round-trip for v1.evennia.com."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.evennia_v1 import EvenniaV1Format
|
||||
|
||||
self.fmt = EvenniaV1Format()
|
||||
|
||||
def test_oob_roundtrip(self):
|
||||
"""Encode an OOB command and decode the result."""
|
||||
encoded = self.fmt.encode_default("custom_cmd", "arg1", protocol_flags={})
|
||||
data, is_binary = encoded
|
||||
# Decode it back
|
||||
result = self.fmt.decode_incoming(data, is_binary=is_binary)
|
||||
self.assertIn("custom_cmd", result)
|
||||
|
||||
|
||||
class TestGmcpRoundTrip(TestCase):
|
||||
"""Test encode → decode round-trip for gmcp.mudstandards.org."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.gmcp_standard import GmcpStandardFormat
|
||||
|
||||
self.fmt = GmcpStandardFormat()
|
||||
|
||||
def test_oob_roundtrip_no_args(self):
|
||||
"""Encode a command without args and decode."""
|
||||
encoded = self.fmt.encode_default("ping", protocol_flags={})
|
||||
data, is_binary = encoded
|
||||
result = self.fmt.decode_incoming(data, is_binary=is_binary)
|
||||
self.assertIn("ping", result)
|
||||
|
||||
def test_oob_roundtrip_with_kwargs(self):
|
||||
"""Encode a command with kwargs and decode."""
|
||||
encoded = self.fmt.encode_default("get_value", protocol_flags={}, hp=100)
|
||||
data, is_binary = encoded
|
||||
result = self.fmt.decode_incoming(data, is_binary=is_binary)
|
||||
self.assertIn("get_value", result)
|
||||
|
||||
|
||||
class TestJsonStandardRoundTrip(TestCase):
|
||||
"""Test encode → decode round-trip for json.mudstandards.org."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.json_standard import JsonStandardFormat
|
||||
|
||||
self.fmt = JsonStandardFormat()
|
||||
|
||||
def test_oob_roundtrip(self):
|
||||
"""Encode an OOB command as GMCP-in-JSON and decode."""
|
||||
encoded = self.fmt.encode_default("ping", protocol_flags={})
|
||||
data, is_binary = encoded
|
||||
result = self.fmt.decode_incoming(data, is_binary=is_binary)
|
||||
self.assertIn("ping", result)
|
||||
|
||||
def test_prompt_roundtrip(self):
|
||||
"""Encode a prompt and verify the envelope."""
|
||||
encoded = self.fmt.encode_prompt("HP: 100>", protocol_flags={})
|
||||
data, is_binary = encoded
|
||||
self.assertFalse(is_binary)
|
||||
envelope = json.loads(data.decode("utf-8"))
|
||||
self.assertEqual(envelope["proto"], "prompt")
|
||||
self.assertIn("HP:", envelope["data"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge-case tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestTerminalEdgeCases(TestCase):
|
||||
"""Edge-case tests for TerminalFormat."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.terminal import TerminalFormat
|
||||
|
||||
self.fmt = TerminalFormat()
|
||||
|
||||
def test_decode_text_frame_treated_as_text(self):
|
||||
"""TEXT frames should still be decoded as text input."""
|
||||
payload = "look".encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
self.assertEqual(result, {"text": [["look"], {}]})
|
||||
|
||||
def test_decode_empty_bytes(self):
|
||||
"""Empty bytes should return None."""
|
||||
result = self.fmt.decode_incoming(b"", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_empty_bytes_text_frame(self):
|
||||
"""Empty TEXT frame bytes should return None."""
|
||||
result = self.fmt.decode_incoming(b"", is_binary=False)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestJsonStandardEdgeCases(TestCase):
|
||||
"""Edge-case tests for JsonStandardFormat."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.json_standard import JsonStandardFormat
|
||||
|
||||
self.fmt = JsonStandardFormat()
|
||||
|
||||
def test_decode_envelope_missing_proto(self):
|
||||
"""JSON envelope with missing proto should use empty string default."""
|
||||
envelope = {"id": "my_cmd", "data": "test"}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
# Empty proto, cmd_id="my_cmd" → funcname="my_cmd"
|
||||
self.assertIn("my_cmd", result)
|
||||
|
||||
def test_decode_envelope_missing_all_fields(self):
|
||||
"""JSON envelope with no recognized fields returns None (empty funcname)."""
|
||||
payload = json.dumps({}).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
# proto="", cmd_id="", data="" → generic handler, funcname="" → None
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_envelope_missing_id_and_data(self):
|
||||
"""Envelope with only proto should work."""
|
||||
envelope = {"proto": "text"}
|
||||
payload = json.dumps(envelope).encode("utf-8")
|
||||
result = self.fmt.decode_incoming(payload, is_binary=False)
|
||||
# proto="text", data="" → text input with empty string
|
||||
self.assertEqual(result, {"text": [[""], {}]})
|
||||
|
||||
def test_decode_binary_invalid_utf8(self):
|
||||
"""Invalid UTF-8 in BINARY frame should return None."""
|
||||
result = self.fmt.decode_incoming(b"\xff\xfe", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestGmcpStandardEdgeCases(TestCase):
|
||||
"""Edge-case tests for GmcpStandardFormat."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.gmcp_standard import GmcpStandardFormat
|
||||
|
||||
self.fmt = GmcpStandardFormat()
|
||||
|
||||
def test_decode_empty_binary(self):
|
||||
"""Empty BINARY frame should return None."""
|
||||
result = self.fmt.decode_incoming(b"", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_decode_binary_whitespace_only(self):
|
||||
"""Whitespace-only BINARY frame should return None."""
|
||||
result = self.fmt.decode_incoming(b" \n ", is_binary=True)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestBaseWireFormatHelpers(TestCase):
|
||||
"""Tests for the shared helper methods on WireFormat base class."""
|
||||
|
||||
def setUp(self):
|
||||
from evennia.server.portal.wire_formats.base import WireFormat
|
||||
|
||||
self.base = WireFormat()
|
||||
|
||||
def test_extract_text_and_flags_basic(self):
|
||||
"""Basic extraction with no options or flags."""
|
||||
kwargs = {}
|
||||
result = self.base._extract_text_and_flags(("hello",), kwargs, {})
|
||||
self.assertEqual(result, ("hello", False, False, False))
|
||||
|
||||
def test_extract_text_and_flags_none_text(self):
|
||||
"""None as text should return None."""
|
||||
kwargs = {}
|
||||
result = self.base._extract_text_and_flags((None,), kwargs, {})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_extract_text_and_flags_no_args(self):
|
||||
"""Empty args should return None."""
|
||||
kwargs = {}
|
||||
result = self.base._extract_text_and_flags((), kwargs, {})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_extract_text_and_flags_options_override(self):
|
||||
"""Options should override protocol_flags."""
|
||||
kwargs = {"options": {"nocolor": True, "screenreader": True}}
|
||||
result = self.base._extract_text_and_flags(
|
||||
("hello",), kwargs, {"NOCOLOR": False, "SCREENREADER": False}
|
||||
)
|
||||
self.assertEqual(result, ("hello", False, True, True))
|
||||
|
||||
def test_extract_text_and_flags_raw_option(self):
|
||||
"""Raw option should be extracted into the result tuple."""
|
||||
kwargs = {"options": {"raw": True}}
|
||||
result = self.base._extract_text_and_flags(("hello",), kwargs, {})
|
||||
self.assertEqual(result, ("hello", True, False, False))
|
||||
|
||||
def test_extract_text_and_flags_raw_protocol_flag(self):
|
||||
"""RAW protocol flag should be used when option is absent."""
|
||||
kwargs = {}
|
||||
result = self.base._extract_text_and_flags(
|
||||
("hello",), kwargs, {"RAW": True}
|
||||
)
|
||||
self.assertEqual(result, ("hello", True, False, False))
|
||||
|
||||
def test_extract_text_and_flags_from_protocol_flags(self):
|
||||
"""Protocol flags should be used when options are absent."""
|
||||
kwargs = {}
|
||||
result = self.base._extract_text_and_flags(
|
||||
("hello",), kwargs, {"NOCOLOR": True}
|
||||
)
|
||||
self.assertEqual(result, ("hello", False, True, False))
|
||||
|
||||
def test_process_ansi_normal(self):
|
||||
"""Normal mode should produce ANSI escape sequences."""
|
||||
text = self.base._process_ansi("Hello |rworld|n", False, False, False)
|
||||
self.assertIn("\033[", text)
|
||||
|
||||
def test_process_ansi_nocolor(self):
|
||||
"""Nocolor mode should strip all ANSI."""
|
||||
text = self.base._process_ansi("Hello |rworld|n", False, True, False)
|
||||
self.assertNotIn("\033[", text)
|
||||
self.assertNotIn("|r", text)
|
||||
|
||||
def test_process_ansi_screenreader(self):
|
||||
"""Screenreader mode should strip ANSI and apply regex."""
|
||||
text = self.base._process_ansi("Hello |rworld|n", False, False, True)
|
||||
self.assertNotIn("\033[", text)
|
||||
self.assertNotIn("|r", text)
|
||||
|
||||
def test_process_ansi_raw(self):
|
||||
"""Raw mode should return text unmodified."""
|
||||
text = self.base._process_ansi("Hello |rworld|n", True, False, False)
|
||||
self.assertEqual(text, "Hello |rworld|n")
|
||||
|
||||
def test_process_ansi_trailing_reset(self):
|
||||
"""Normal mode should append |n to prevent color bleed."""
|
||||
text = self.base._process_ansi("Hello |rworld", False, False, False)
|
||||
# Should end with ANSI reset sequence
|
||||
self.assertTrue(text.endswith("\033[0m"))
|
||||
|
||||
def test_process_ansi_trailing_pipe_preserved(self):
|
||||
"""A trailing literal pipe should be preserved, not stripped."""
|
||||
text = self.base._process_ansi("choice a|b|", False, False, False)
|
||||
# The || escape produces a literal pipe; the |n appends a reset.
|
||||
# The important thing is the literal pipe is not lost.
|
||||
self.assertIn("|", text.replace("\033[0m", ""))
|
||||
|
|
@ -330,7 +330,16 @@ class TestWebSocket(BaseEvenniaTest):
|
|||
@mock.patch("evennia.server.portal.portalsessionhandler.reactor", new=MagicMock())
|
||||
def test_data_out(self):
|
||||
self.proto.onOpen()
|
||||
self.proto.sendLine = MagicMock()
|
||||
msg = json.dumps(["logged_in", (), {}])
|
||||
self.proto.sendEncoded = MagicMock()
|
||||
self.proto.sessionhandler.data_out(self.proto, text=[["Excepting Alice"], {}])
|
||||
self.proto.sendLine.assert_called_with(json.dumps(["text", ["Excepting Alice"], {}]))
|
||||
self.proto.sendEncoded.assert_called_once()
|
||||
call_args = self.proto.sendEncoded.call_args
|
||||
data = call_args[0][0]
|
||||
# EvenniaV1Format encodes as JSON TEXT frame
|
||||
parsed = json.loads(data)
|
||||
self.assertEqual(parsed[0], "text")
|
||||
self.assertEqual(parsed[1], ["Excepting Alice"])
|
||||
# Verify frame is sent as TEXT (not BINARY) — v1 uses JSON TEXT frames
|
||||
args, kwargs = call_args
|
||||
is_binary = kwargs.get("is_binary", args[1] if len(args) > 1 else False)
|
||||
self.assertFalse(is_binary)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,28 @@
|
|||
"""
|
||||
Webclient based on websockets.
|
||||
Webclient based on websockets with MUD Standards subprotocol support.
|
||||
|
||||
This implements a webclient with WebSockets (http://en.wikipedia.org/wiki/WebSocket)
|
||||
by use of the autobahn-python package's implementation (https://github.com/crossbario/autobahn-python).
|
||||
It is used together with evennia/web/media/javascript/evennia_websocket_webclient.js.
|
||||
|
||||
All data coming into the webclient is in the form of valid JSON on the form
|
||||
Subprotocol Negotiation (RFC 6455 Sec-WebSocket-Protocol):
|
||||
When a client connects, it may offer one or more WebSocket subprotocols
|
||||
via the Sec-WebSocket-Protocol header. This module negotiates the best
|
||||
match from the server's supported list (configured via
|
||||
settings.WEBSOCKET_SUBPROTOCOLS) and selects the appropriate wire format
|
||||
codec for the connection's lifetime.
|
||||
|
||||
Supported subprotocols (per https://mudstandards.org/websocket/):
|
||||
- v1.evennia.com: Evennia's legacy JSON array format
|
||||
- json.mudstandards.org: MUD Standards JSON envelope format
|
||||
- gmcp.mudstandards.org: GMCP over WebSocket
|
||||
- terminal.mudstandards.org: Raw ANSI terminal over WebSocket
|
||||
|
||||
If no subprotocol is negotiated (legacy client with no header),
|
||||
the v1.evennia.com format is used as the default.
|
||||
|
||||
All data coming into the webclient via the v1.evennia.com format is in the
|
||||
form of valid JSON on the form
|
||||
|
||||
`["inputfunc_name", [args], {kwarg}]`
|
||||
|
||||
|
|
@ -15,21 +32,14 @@ from the command line and interprets it as an Evennia Command: `["text", ["look"
|
|||
|
||||
"""
|
||||
|
||||
import html
|
||||
import json
|
||||
import re
|
||||
|
||||
from autobahn.exception import Disconnected
|
||||
from autobahn.twisted.websocket import WebSocketServerProtocol
|
||||
from django.conf import settings
|
||||
|
||||
from evennia.utils.ansi import parse_ansi
|
||||
from evennia.utils.text2html import parse_html
|
||||
from evennia.utils.utils import class_from_module, mod_import
|
||||
|
||||
_RE_SCREENREADER_REGEX = re.compile(
|
||||
r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
|
||||
)
|
||||
_CLIENT_SESSIONS = mod_import(settings.SESSION_ENGINE).SessionStore
|
||||
_UPSTREAM_IPS = settings.UPSTREAM_IPS
|
||||
|
||||
|
|
@ -43,11 +53,91 @@ GOING_AWAY = WebSocketServerProtocol.CLOSE_STATUS_CODE_GOING_AWAY
|
|||
|
||||
_BASE_SESSION_CLASS = class_from_module(settings.BASE_SESSION_CLASS)
|
||||
|
||||
# --- Wire format support ---
|
||||
# Import wire formats lazily to avoid circular imports at module level.
|
||||
# The WIRE_FORMATS dict and format instances are created on first use.
|
||||
_wire_formats = None
|
||||
|
||||
|
||||
def _get_wire_formats():
|
||||
"""
|
||||
Lazily load and return the wire format registry.
|
||||
|
||||
Returns:
|
||||
dict: Mapping of subprotocol name -> WireFormat instance.
|
||||
|
||||
"""
|
||||
global _wire_formats
|
||||
if _wire_formats is None:
|
||||
try:
|
||||
from evennia.server.portal.wire_formats import WIRE_FORMATS
|
||||
|
||||
_wire_formats = WIRE_FORMATS
|
||||
except Exception:
|
||||
from evennia.utils import logger
|
||||
|
||||
logger.log_trace("Failed to load wire format registry")
|
||||
_wire_formats = {}
|
||||
return _wire_formats
|
||||
|
||||
|
||||
def _get_supported_subprotocols():
|
||||
"""
|
||||
Get the ordered list of supported subprotocol names from settings.
|
||||
|
||||
Falls back to all available wire formats if the setting is not defined.
|
||||
|
||||
Returns:
|
||||
list: Ordered list of subprotocol name strings.
|
||||
|
||||
"""
|
||||
configured = getattr(settings, "WEBSOCKET_SUBPROTOCOLS", None)
|
||||
if configured is None:
|
||||
# No explicit configuration; advertise all known wire formats.
|
||||
return list(_get_wire_formats().keys())
|
||||
|
||||
# Allow a single string (common misconfiguration) by coercing to a list.
|
||||
if isinstance(configured, str):
|
||||
protos = [configured]
|
||||
else:
|
||||
try:
|
||||
protos = list(configured)
|
||||
except TypeError as err:
|
||||
raise TypeError(
|
||||
"settings.WEBSOCKET_SUBPROTOCOLS must be a string or an iterable "
|
||||
"of strings (e.g. list/tuple); got %r" % (configured,)
|
||||
) from err
|
||||
|
||||
# Warn about any configured names that don't match a known wire format.
|
||||
# Unknown names are harmlessly skipped during negotiation (onConnect only
|
||||
# selects protocols present in both the client's offer and the registry),
|
||||
# but a typo here is almost certainly unintentional.
|
||||
wire_formats = _get_wire_formats()
|
||||
unknown = [name for name in protos if name not in wire_formats]
|
||||
if unknown:
|
||||
from evennia.utils import logger
|
||||
|
||||
logger.log_warn(
|
||||
"WEBSOCKET_SUBPROTOCOLS contains unknown protocol name(s): %s. "
|
||||
"Known protocols: %s"
|
||||
% (", ".join(repr(n) for n in unknown), ", ".join(repr(n) for n in wire_formats))
|
||||
)
|
||||
|
||||
return protos
|
||||
|
||||
|
||||
class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
||||
"""
|
||||
Implements the server-side of the Websocket connection.
|
||||
|
||||
Supports multiple wire formats via RFC 6455 subprotocol negotiation.
|
||||
The wire format is selected during the WebSocket handshake in onConnect()
|
||||
and determines how all subsequent messages are encoded and decoded.
|
||||
|
||||
Attributes:
|
||||
wire_format (WireFormat): The selected wire format codec for this
|
||||
connection. Set during onConnect().
|
||||
|
||||
"""
|
||||
|
||||
# nonce value, used to prevent the webclient from erasing the
|
||||
|
|
@ -58,6 +148,56 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
super().__init__(*args, **kwargs)
|
||||
self.protocol_key = "webclient/websocket"
|
||||
self.browserstr = ""
|
||||
self.wire_format = None
|
||||
|
||||
def onConnect(self, request):
|
||||
"""
|
||||
Called during the WebSocket opening handshake, before onOpen().
|
||||
|
||||
This is where we negotiate the WebSocket subprotocol. The client
|
||||
sends a list of subprotocols it supports via Sec-WebSocket-Protocol.
|
||||
We select the best match from our supported list.
|
||||
|
||||
Args:
|
||||
request (ConnectionRequest): The WebSocket connection request,
|
||||
containing request.protocols (list of offered subprotocols).
|
||||
|
||||
Returns:
|
||||
str or None: The selected subprotocol name to echo back in the
|
||||
Sec-WebSocket-Protocol response header, or None if no
|
||||
subprotocol was negotiated (legacy client with no header,
|
||||
or client offered protocols that don't match).
|
||||
|
||||
"""
|
||||
wire_formats = _get_wire_formats()
|
||||
supported = _get_supported_subprotocols()
|
||||
|
||||
if request.protocols:
|
||||
# Client offered subprotocols — pick the first one we support
|
||||
# (order follows the server's preference from settings)
|
||||
for proto_name in supported:
|
||||
if proto_name in request.protocols and proto_name in wire_formats:
|
||||
self.wire_format = wire_formats[proto_name]
|
||||
return proto_name
|
||||
|
||||
# Client offered protocols but none matched. Per RFC 6455, if we
|
||||
# don't echo a subprotocol, a well-behaved client should close the
|
||||
# connection. We still set a wire format so the connection doesn't
|
||||
# crash if the client proceeds anyway.
|
||||
if "v1.evennia.com" in wire_formats:
|
||||
self.wire_format = wire_formats["v1.evennia.com"]
|
||||
elif wire_formats:
|
||||
self.wire_format = next(iter(wire_formats.values()))
|
||||
return None
|
||||
|
||||
# No Sec-WebSocket-Protocol header at all — legacy client.
|
||||
# Always use v1 format regardless of WEBSOCKET_SUBPROTOCOLS.
|
||||
if "v1.evennia.com" in wire_formats:
|
||||
self.wire_format = wire_formats["v1.evennia.com"]
|
||||
elif wire_formats:
|
||||
self.wire_format = next(iter(wire_formats.values()))
|
||||
|
||||
return None
|
||||
|
||||
def get_client_session(self):
|
||||
"""
|
||||
|
|
@ -131,10 +271,31 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
self.sessid = old_session.sessid
|
||||
self.sessionhandler.disconnect(old_session)
|
||||
|
||||
# Ensure wire_format is set (it should be from onConnect, but
|
||||
# in testing scenarios onConnect may not have been called)
|
||||
if self.wire_format is None:
|
||||
wire_formats = _get_wire_formats()
|
||||
self.wire_format = wire_formats.get(
|
||||
"v1.evennia.com", next(iter(wire_formats.values()), None)
|
||||
)
|
||||
|
||||
if self.wire_format is None:
|
||||
from evennia.utils import logger
|
||||
|
||||
logger.log_err(
|
||||
"WebSocketClient: No wire formats available. "
|
||||
"Closing connection."
|
||||
)
|
||||
self.sendClose(CLOSE_NORMAL, "No wire formats available")
|
||||
return
|
||||
|
||||
browserstr = f":{self.browserstr}" if self.browserstr else ""
|
||||
self.protocol_flags["CLIENTNAME"] = f"Evennia Webclient (websocket{browserstr})"
|
||||
proto_name = self.wire_format.name
|
||||
self.protocol_flags["CLIENTNAME"] = (
|
||||
f"Evennia Webclient (websocket{browserstr} [{proto_name}])"
|
||||
)
|
||||
self.protocol_flags["UTF-8"] = True
|
||||
self.protocol_flags["OOB"] = True
|
||||
self.protocol_flags["OOB"] = self.wire_format.supports_oob
|
||||
self.protocol_flags["TRUECOLOR"] = True
|
||||
self.protocol_flags["XTERM256"] = True
|
||||
self.protocol_flags["ANSI"] = True
|
||||
|
|
@ -196,15 +357,29 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
"""
|
||||
Callback fired when a complete WebSocket message was received.
|
||||
|
||||
Delegates to the active wire format's decode_incoming() method
|
||||
to parse the message into kwargs for data_in().
|
||||
|
||||
Args:
|
||||
payload (bytes): The WebSocket message received.
|
||||
isBinary (bool): Flag indicating whether payload is binary or
|
||||
UTF-8 encoded text.
|
||||
|
||||
"""
|
||||
cmdarray = json.loads(str(payload, "utf-8"))
|
||||
if cmdarray:
|
||||
self.data_in(**{cmdarray[0]: [cmdarray[1], cmdarray[2]]})
|
||||
if self.wire_format:
|
||||
kwargs = self.wire_format.decode_incoming(
|
||||
payload, isBinary, protocol_flags=self.protocol_flags
|
||||
)
|
||||
if kwargs:
|
||||
self.data_in(**kwargs)
|
||||
else:
|
||||
# Fallback: try legacy JSON parsing
|
||||
try:
|
||||
cmdarray = json.loads(str(payload, "utf-8"))
|
||||
if cmdarray:
|
||||
self.data_in(**{cmdarray[0]: [cmdarray[1], cmdarray[2]]})
|
||||
except (json.JSONDecodeError, UnicodeDecodeError, IndexError):
|
||||
pass
|
||||
|
||||
def sendLine(self, line):
|
||||
"""
|
||||
|
|
@ -221,6 +396,24 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
# it means this link is actually already closed.
|
||||
self.disconnect(reason="Browser already closed.")
|
||||
|
||||
def sendEncoded(self, data, is_binary=False):
|
||||
"""
|
||||
Send pre-encoded data to the client.
|
||||
|
||||
This is used by wire formats that return raw bytes with a
|
||||
binary/text frame indicator.
|
||||
|
||||
Args:
|
||||
data (bytes): The encoded data to send.
|
||||
is_binary (bool): If True, send as a BINARY frame.
|
||||
If False, send as a TEXT frame.
|
||||
|
||||
"""
|
||||
try:
|
||||
return self.sendMessage(data, isBinary=is_binary)
|
||||
except Disconnected:
|
||||
self.disconnect(reason="Browser already closed.")
|
||||
|
||||
def at_login(self):
|
||||
csession = self.get_client_session()
|
||||
if csession:
|
||||
|
|
@ -256,20 +449,47 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
|
||||
def send_text(self, *args, **kwargs):
|
||||
"""
|
||||
Send text data. This will pre-process the text for
|
||||
color-replacement, conversion to html etc.
|
||||
Send text data. Delegates to the active wire format's encode_text()
|
||||
method, which handles ANSI processing and framing. The exact output
|
||||
depends on the negotiated subprotocol (e.g., HTML for v1.evennia.com,
|
||||
raw ANSI for MUD Standards formats).
|
||||
|
||||
Args:
|
||||
text (str): Text to send.
|
||||
|
||||
Keyword Args:
|
||||
options (dict): Options-dict with the following keys understood:
|
||||
- raw (bool): No parsing at all (leave ansi-to-html markers unparsed).
|
||||
- raw (bool): No parsing at all (leave ansi markers unparsed).
|
||||
- nocolor (bool): Clean out all color.
|
||||
- screenreader (bool): Use Screenreader mode.
|
||||
- send_prompt (bool): Send a prompt with parsed html
|
||||
- send_prompt (bool): Send as a prompt instead of regular text.
|
||||
|
||||
"""
|
||||
if self.wire_format:
|
||||
result = self.wire_format.encode_text(
|
||||
*args, protocol_flags=self.protocol_flags, **kwargs
|
||||
)
|
||||
if result is not None:
|
||||
data, is_binary = result
|
||||
self.sendEncoded(data, is_binary=is_binary)
|
||||
else:
|
||||
# Fallback: legacy behavior
|
||||
self._send_text_legacy(*args, **kwargs)
|
||||
|
||||
def _send_text_legacy(self, *args, **kwargs):
|
||||
"""
|
||||
Legacy send_text fallback for when no wire format is set.
|
||||
|
||||
Performs the original Evennia HTML conversion (parse_html) and
|
||||
sends a JSON array ``["text", [html_string], {}]`` via sendLine.
|
||||
|
||||
"""
|
||||
import html as html_lib
|
||||
import re
|
||||
|
||||
from evennia.utils.ansi import parse_ansi
|
||||
from evennia.utils.text2html import parse_html
|
||||
|
||||
if args:
|
||||
args = list(args)
|
||||
text = args[0]
|
||||
|
|
@ -277,35 +497,54 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
return
|
||||
else:
|
||||
return
|
||||
|
||||
flags = self.protocol_flags
|
||||
|
||||
options = kwargs.pop("options", {})
|
||||
raw = options.get("raw", flags.get("RAW", False))
|
||||
client_raw = options.get("client_raw", False)
|
||||
nocolor = options.get("nocolor", flags.get("NOCOLOR", False))
|
||||
screenreader = options.get("screenreader", flags.get("SCREENREADER", False))
|
||||
prompt = options.get("send_prompt", False)
|
||||
|
||||
_RE = re.compile(
|
||||
r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
|
||||
)
|
||||
if screenreader:
|
||||
# screenreader mode cleans up output
|
||||
text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False)
|
||||
text = _RE_SCREENREADER_REGEX.sub("", text)
|
||||
text = _RE.sub("", text)
|
||||
cmd = "prompt" if prompt else "text"
|
||||
if raw:
|
||||
if client_raw:
|
||||
args[0] = text
|
||||
else:
|
||||
args[0] = html.escape(text) # escape html!
|
||||
args[0] = html_lib.escape(text)
|
||||
else:
|
||||
args[0] = parse_html(text, strip_ansi=nocolor)
|
||||
|
||||
# send to client on required form [cmdname, args, kwargs]
|
||||
self.sendLine(json.dumps([cmd, args, kwargs]))
|
||||
|
||||
def send_prompt(self, *args, **kwargs):
|
||||
kwargs["options"].update({"send_prompt": True})
|
||||
self.send_text(*args, **kwargs)
|
||||
"""
|
||||
Send a prompt to the client.
|
||||
|
||||
Prompts are handled separately from regular text because some
|
||||
wire formats (e.g. json.mudstandards.org) send prompts as a
|
||||
distinct message type that the client can render differently.
|
||||
|
||||
Args:
|
||||
*args: Prompt text as first arg.
|
||||
|
||||
Keyword Args:
|
||||
options (dict): Same options as send_text.
|
||||
|
||||
"""
|
||||
if self.wire_format:
|
||||
result = self.wire_format.encode_prompt(
|
||||
*args, protocol_flags=self.protocol_flags, **kwargs
|
||||
)
|
||||
if result is not None:
|
||||
data, is_binary = result
|
||||
self.sendEncoded(data, is_binary=is_binary)
|
||||
else:
|
||||
kwargs.setdefault("options", {}).update({"send_prompt": True})
|
||||
self.send_text(*args, **kwargs)
|
||||
|
||||
def send_default(self, cmdname, *args, **kwargs):
|
||||
"""
|
||||
|
|
@ -321,5 +560,14 @@ class WebSocketClient(WebSocketServerProtocol, _BASE_SESSION_CLASS):
|
|||
client instead.
|
||||
|
||||
"""
|
||||
if not cmdname == "options":
|
||||
self.sendLine(json.dumps([cmdname, args, kwargs]))
|
||||
if self.wire_format:
|
||||
result = self.wire_format.encode_default(
|
||||
cmdname, *args, protocol_flags=self.protocol_flags, **kwargs
|
||||
)
|
||||
if result is not None:
|
||||
data, is_binary = result
|
||||
self.sendEncoded(data, is_binary=is_binary)
|
||||
else:
|
||||
# Fallback: legacy behavior
|
||||
if not cmdname == "options":
|
||||
self.sendLine(json.dumps([cmdname, args, kwargs]))
|
||||
|
|
|
|||
46
evennia/server/portal/wire_formats/__init__.py
Normal file
46
evennia/server/portal/wire_formats/__init__.py
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
"""
|
||||
Wire format codecs for WebSocket subprotocol support.
|
||||
|
||||
This package implements the strategy pattern for WebSocket wire formats,
|
||||
allowing Evennia to support multiple WebSocket subprotocols as defined by
|
||||
the MUD Standards WebSocket proposal (https://mudstandards.org/websocket/).
|
||||
|
||||
Each wire format is a self-contained codec that handles encoding outgoing
|
||||
data and decoding incoming data for a specific WebSocket subprotocol.
|
||||
|
||||
Supported subprotocols:
|
||||
- v1.evennia.com: Evennia's legacy JSON array format
|
||||
- json.mudstandards.org: MUD Standards JSON envelope format
|
||||
- gmcp.mudstandards.org: GMCP over WebSocket
|
||||
- terminal.mudstandards.org: Raw ANSI terminal over WebSocket
|
||||
"""
|
||||
|
||||
from .base import WireFormat
|
||||
from .evennia_v1 import EvenniaV1Format
|
||||
from .gmcp_standard import GmcpStandardFormat
|
||||
from .json_standard import JsonStandardFormat
|
||||
from .terminal import TerminalFormat
|
||||
|
||||
# Registry of all available wire formats, keyed by subprotocol name.
|
||||
# Note: Dict order only affects the default negotiation priority when
|
||||
# settings.WEBSOCKET_SUBPROTOCOLS is not set (None). When the setting
|
||||
# is configured, it controls the order in which subprotocols are matched
|
||||
# against a client's offered list.
|
||||
WIRE_FORMATS = {
|
||||
fmt.name: fmt
|
||||
for fmt in [
|
||||
JsonStandardFormat(),
|
||||
GmcpStandardFormat(),
|
||||
TerminalFormat(),
|
||||
EvenniaV1Format(),
|
||||
]
|
||||
}
|
||||
|
||||
__all__ = [
|
||||
"WireFormat",
|
||||
"EvenniaV1Format",
|
||||
"JsonStandardFormat",
|
||||
"GmcpStandardFormat",
|
||||
"TerminalFormat",
|
||||
"WIRE_FORMATS",
|
||||
]
|
||||
211
evennia/server/portal/wire_formats/base.py
Normal file
211
evennia/server/portal/wire_formats/base.py
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
"""
|
||||
Base wire format interface for WebSocket subprotocol codecs.
|
||||
|
||||
All wire format implementations must subclass WireFormat and implement
|
||||
the encoding/decoding methods. Each format represents a specific
|
||||
WebSocket subprotocol as defined by RFC 6455 Sec-WebSocket-Protocol
|
||||
negotiation.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from evennia.utils.ansi import parse_ansi
|
||||
|
||||
_RE_SCREENREADER_REGEX = re.compile(
|
||||
r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE
|
||||
)
|
||||
_RE_N = re.compile(r"\|n$")
|
||||
|
||||
|
||||
class WireFormat:
|
||||
"""
|
||||
Abstract base class for WebSocket wire format codecs.
|
||||
|
||||
A wire format handles the translation between Evennia's internal
|
||||
message representation and the bytes sent over the WebSocket connection.
|
||||
|
||||
Each subclass corresponds to a specific WebSocket subprotocol name
|
||||
(e.g., "v1.evennia.com", "json.mudstandards.org").
|
||||
|
||||
Attributes:
|
||||
name (str): The subprotocol identifier string, used in
|
||||
Sec-WebSocket-Protocol negotiation.
|
||||
supports_oob (bool): Whether this format supports out-of-band
|
||||
data (structured commands beyond plain text).
|
||||
|
||||
"""
|
||||
|
||||
name = None
|
||||
supports_oob = True
|
||||
|
||||
@staticmethod
|
||||
def _extract_text_and_flags(args, kwargs, protocol_flags):
|
||||
"""
|
||||
Extract text string and display flags from encode arguments.
|
||||
|
||||
This is a shared helper for encode_text/encode_prompt in formats
|
||||
that use raw ANSI output (terminal, json, gmcp). The EvenniaV1
|
||||
format has its own logic (HTML conversion, raw mode) and does
|
||||
not use this helper.
|
||||
|
||||
Args:
|
||||
args (tuple): Positional args passed to encode_text/encode_prompt.
|
||||
args[0] should be the text string.
|
||||
kwargs (dict): Keyword args. The "options" key is popped and
|
||||
inspected for "raw", "nocolor" and "screenreader" overrides.
|
||||
protocol_flags (dict or None): Session protocol flags.
|
||||
|
||||
Returns:
|
||||
tuple or None: (text, raw, nocolor, screenreader) if text is
|
||||
valid, or None if there is no text to encode.
|
||||
|
||||
"""
|
||||
if args:
|
||||
text = args[0]
|
||||
if text is None:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
flags = protocol_flags or {}
|
||||
options = kwargs.pop("options", {})
|
||||
raw = options.get("raw", flags.get("RAW", False))
|
||||
nocolor = options.get("nocolor", flags.get("NOCOLOR", False))
|
||||
screenreader = options.get("screenreader", flags.get("SCREENREADER", False))
|
||||
return (text, raw, nocolor, screenreader)
|
||||
|
||||
@staticmethod
|
||||
def _process_ansi(text, raw, nocolor, screenreader):
|
||||
"""
|
||||
Process Evennia ANSI markup into terminal escape sequences.
|
||||
|
||||
Applies screenreader stripping, nocolor stripping, or full ANSI
|
||||
conversion depending on the flags. This is the shared logic for
|
||||
all non-HTML wire formats (terminal, json, gmcp).
|
||||
|
||||
When raw is True, text is returned unmodified (no ANSI processing).
|
||||
|
||||
For non-raw output, a trailing reset (|n) is appended to prevent
|
||||
color/attribute bleed into subsequent output, mirroring the
|
||||
TelnetProtocol behavior.
|
||||
|
||||
Args:
|
||||
text (str): Text with Evennia ANSI markup (|r, |n, etc.).
|
||||
raw (bool): If True, bypass all ANSI processing.
|
||||
nocolor (bool): If True, strip all ANSI codes.
|
||||
screenreader (bool): If True, strip ANSI and apply
|
||||
SCREENREADER_REGEX_STRIP.
|
||||
|
||||
Returns:
|
||||
str: Processed text with real ANSI escape sequences,
|
||||
stripped text, or raw text.
|
||||
|
||||
"""
|
||||
if raw:
|
||||
return text
|
||||
if screenreader:
|
||||
text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False)
|
||||
text = _RE_SCREENREADER_REGEX.sub("", text)
|
||||
elif nocolor:
|
||||
text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False)
|
||||
else:
|
||||
# Ensure ANSI state is reset at the end of the string to prevent
|
||||
# color/attribute bleed into subsequent output. This mirrors
|
||||
# TelnetProtocol/SSH behavior: strip any existing trailing |n,
|
||||
# then append ||n (preserving a literal trailing pipe via the ||
|
||||
# escape) or |n as appropriate.
|
||||
text = _RE_N.sub("", text) + ("||n" if text.endswith("|") else "|n")
|
||||
text = parse_ansi(text, xterm256=True, mxp=False)
|
||||
return text
|
||||
|
||||
def decode_incoming(self, payload, is_binary, protocol_flags=None):
|
||||
"""
|
||||
Decode an incoming WebSocket message into kwargs for data_in().
|
||||
|
||||
Args:
|
||||
payload (bytes): Raw WebSocket message payload.
|
||||
is_binary (bool): True if this was a BINARY frame (opcode 2),
|
||||
False if it was a TEXT frame (opcode 1).
|
||||
protocol_flags (dict, optional): The session's protocol flags,
|
||||
which may affect decoding behavior.
|
||||
|
||||
Returns:
|
||||
dict or None: A dict of kwargs to pass to session.data_in(),
|
||||
where each key is an inputfunc name and value is [args, kwargs].
|
||||
Returns None if the message should be silently ignored.
|
||||
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
f"{self.__class__.__name__} must implement decode_incoming()"
|
||||
)
|
||||
|
||||
def encode_text(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode text output for sending to the client.
|
||||
|
||||
This handles the "text" outputfunc — the primary game output.
|
||||
|
||||
Args:
|
||||
*args: Text arguments. args[0] is typically the text string.
|
||||
protocol_flags (dict, optional): Session protocol flags that
|
||||
may affect encoding (e.g., NOCOLOR, SCREENREADER, RAW).
|
||||
**kwargs: Additional keyword arguments. May include an
|
||||
"options" dict with keys like "raw", "nocolor",
|
||||
"screenreader", "send_prompt".
|
||||
|
||||
Returns:
|
||||
tuple or None: A (data_bytes, is_binary) tuple for sendMessage(),
|
||||
or None if nothing should be sent.
|
||||
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
f"{self.__class__.__name__} must implement encode_text()"
|
||||
)
|
||||
|
||||
def encode_prompt(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode a prompt for sending to the client.
|
||||
|
||||
Default implementation delegates to encode_text with the
|
||||
send_prompt option set.
|
||||
|
||||
Args:
|
||||
*args: Prompt arguments.
|
||||
protocol_flags (dict, optional): Session protocol flags.
|
||||
**kwargs: Additional keyword arguments. May include an "options"
|
||||
dict; if absent, one is created with "send_prompt" set to True.
|
||||
|
||||
Returns:
|
||||
tuple or None: A (data_bytes, is_binary) tuple for sendMessage(),
|
||||
or None if nothing should be sent.
|
||||
|
||||
"""
|
||||
options = kwargs.get("options", {})
|
||||
options["send_prompt"] = True
|
||||
kwargs["options"] = options
|
||||
return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs)
|
||||
|
||||
def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode a non-text OOB command for sending to the client.
|
||||
|
||||
This handles all outputfuncs that don't have a specific send_*
|
||||
method, including custom OOB commands.
|
||||
|
||||
Args:
|
||||
cmdname (str): The OOB command name.
|
||||
*args: Command arguments.
|
||||
protocol_flags (dict, optional): Session protocol flags.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
Returns:
|
||||
tuple or None: A (data_bytes, is_binary) tuple for sendMessage(),
|
||||
or None if nothing should be sent (e.g., if the format
|
||||
doesn't support OOB).
|
||||
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
f"{self.__class__.__name__} must implement encode_default()"
|
||||
)
|
||||
131
evennia/server/portal/wire_formats/evennia_v1.py
Normal file
131
evennia/server/portal/wire_formats/evennia_v1.py
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
"""
|
||||
Evennia V1 wire format (v1.evennia.com).
|
||||
|
||||
This is Evennia's legacy WebSocket wire format. All messages are UTF-8
|
||||
JSON text frames in the form:
|
||||
|
||||
["cmdname", [args], {kwargs}]
|
||||
|
||||
Text output is HTML-converted from ANSI before sending. This format
|
||||
is used by Evennia's built-in webclient and is the default when no
|
||||
WebSocket subprotocol is negotiated.
|
||||
"""
|
||||
|
||||
import html
|
||||
import json
|
||||
|
||||
from evennia.utils.ansi import parse_ansi
|
||||
from evennia.utils.text2html import parse_html
|
||||
|
||||
from .base import WireFormat, _RE_SCREENREADER_REGEX
|
||||
|
||||
|
||||
class EvenniaV1Format(WireFormat):
|
||||
"""
|
||||
Evennia's legacy wire format: JSON arrays over TEXT frames.
|
||||
|
||||
Wire format:
|
||||
All frames are TEXT (UTF-8 JSON).
|
||||
Structure: ["cmdname", [args], {kwargs}]
|
||||
|
||||
Text handling:
|
||||
Outgoing text is converted from ANSI to HTML via parse_html().
|
||||
|
||||
OOB:
|
||||
All commands are effectively OOB — the cmdname field can be
|
||||
any string, not just "text".
|
||||
"""
|
||||
|
||||
name = "v1.evennia.com"
|
||||
supports_oob = True
|
||||
|
||||
def decode_incoming(self, payload, is_binary, protocol_flags=None):
|
||||
"""
|
||||
Decode incoming JSON array message.
|
||||
|
||||
Args:
|
||||
payload (bytes): UTF-8 encoded JSON: ["cmdname", [args], {kwargs}]
|
||||
is_binary (bool): Should be False for this format.
|
||||
protocol_flags (dict, optional): Not used by this format.
|
||||
|
||||
Returns:
|
||||
dict or None: kwargs for data_in(), e.g. {"text": [["look"], {}]}
|
||||
|
||||
"""
|
||||
try:
|
||||
cmdarray = json.loads(str(payload, "utf-8"))
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
return None
|
||||
if isinstance(cmdarray, (list, tuple)) and len(cmdarray) >= 3:
|
||||
return {cmdarray[0]: [cmdarray[1], cmdarray[2]]}
|
||||
return None
|
||||
|
||||
def encode_text(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode text output as HTML-converted JSON.
|
||||
|
||||
Converts ANSI color codes to HTML spans, applies screenreader
|
||||
and raw text options.
|
||||
|
||||
Returns:
|
||||
tuple or None: (json_bytes, False) where False means TEXT frame.
|
||||
|
||||
"""
|
||||
if args:
|
||||
args = list(args)
|
||||
text = args[0]
|
||||
if text is None:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
flags = protocol_flags or {}
|
||||
options = kwargs.pop("options", {})
|
||||
raw = options.get("raw", flags.get("RAW", False))
|
||||
client_raw = options.get("client_raw", False)
|
||||
nocolor = options.get("nocolor", flags.get("NOCOLOR", False))
|
||||
screenreader = options.get("screenreader", flags.get("SCREENREADER", False))
|
||||
prompt = options.get("send_prompt", False)
|
||||
|
||||
if screenreader:
|
||||
text = parse_ansi(text, strip_ansi=True, xterm256=False, mxp=False)
|
||||
text = _RE_SCREENREADER_REGEX.sub("", text)
|
||||
|
||||
cmd = "prompt" if prompt else "text"
|
||||
if raw:
|
||||
if client_raw:
|
||||
args[0] = text
|
||||
else:
|
||||
args[0] = html.escape(text)
|
||||
else:
|
||||
args[0] = parse_html(text, strip_ansi=nocolor)
|
||||
|
||||
return (json.dumps([cmd, args, kwargs]).encode("utf-8"), False)
|
||||
|
||||
def encode_prompt(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode a prompt as HTML-converted JSON with send_prompt flag.
|
||||
|
||||
Returns:
|
||||
tuple or None: (json_bytes, False) for TEXT frame.
|
||||
|
||||
"""
|
||||
options = kwargs.get("options", {})
|
||||
options["send_prompt"] = True
|
||||
kwargs["options"] = options
|
||||
return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs)
|
||||
|
||||
def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode any OOB command as a JSON array.
|
||||
|
||||
Skips the "options" command (legacy behavior).
|
||||
|
||||
Returns:
|
||||
tuple or None: (json_bytes, False) for TEXT frame, or None
|
||||
if cmdname is "options".
|
||||
|
||||
"""
|
||||
if cmdname == "options":
|
||||
return None
|
||||
return (json.dumps([cmdname, args, kwargs]).encode("utf-8"), False)
|
||||
126
evennia/server/portal/wire_formats/gmcp_standard.py
Normal file
126
evennia/server/portal/wire_formats/gmcp_standard.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
"""
|
||||
GMCP MUD Standards wire format (gmcp.mudstandards.org).
|
||||
|
||||
This implements the GMCP subprotocol from the MUD Standards WebSocket
|
||||
proposal (https://mudstandards.org/websocket/).
|
||||
|
||||
Per the standard:
|
||||
- BINARY frames contain regular ANSI in- and output (UTF-8 encoded)
|
||||
- TEXT frames contain UTF-8 encoded GMCP commands in the standard
|
||||
format: "Package.Name json_payload"
|
||||
|
||||
This is a good match for MUD clients that natively speak GMCP, such as
|
||||
Mudlet, as it maps directly to their existing GMCP handling without
|
||||
the extra JSON envelope layer.
|
||||
"""
|
||||
|
||||
from evennia.server.portal.gmcp_utils import decode_gmcp, encode_gmcp
|
||||
|
||||
from .base import WireFormat
|
||||
|
||||
|
||||
class GmcpStandardFormat(WireFormat):
|
||||
"""
|
||||
GMCP-native wire format over WebSocket.
|
||||
|
||||
Wire format:
|
||||
BINARY frames: Raw ANSI text (UTF-8), used for game text I/O.
|
||||
TEXT frames: GMCP commands in standard format
|
||||
"Package.Name json_payload"
|
||||
|
||||
Text handling:
|
||||
Outgoing text retains ANSI escape codes (no HTML conversion).
|
||||
Text is sent as BINARY frames.
|
||||
|
||||
OOB:
|
||||
Supported via TEXT frames carrying GMCP messages. The GMCP
|
||||
format is: "Package.Name optional_json_payload"
|
||||
"""
|
||||
|
||||
name = "gmcp.mudstandards.org"
|
||||
supports_oob = True
|
||||
|
||||
def decode_incoming(self, payload, is_binary, protocol_flags=None):
|
||||
"""
|
||||
Decode incoming WebSocket message.
|
||||
|
||||
BINARY frames are raw text input.
|
||||
TEXT frames are GMCP messages.
|
||||
|
||||
Args:
|
||||
payload (bytes): The raw frame payload.
|
||||
is_binary (bool): True for BINARY frames, False for TEXT.
|
||||
protocol_flags (dict, optional): Not used.
|
||||
|
||||
Returns:
|
||||
dict or None: kwargs for data_in().
|
||||
|
||||
"""
|
||||
if is_binary:
|
||||
# BINARY frame = raw text input
|
||||
try:
|
||||
text = payload.decode("utf-8").strip()
|
||||
except UnicodeDecodeError:
|
||||
return None
|
||||
if not text:
|
||||
return None
|
||||
return {"text": [[text], {}]}
|
||||
else:
|
||||
# TEXT frame = GMCP command
|
||||
try:
|
||||
gmcp_data = payload.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return None
|
||||
return decode_gmcp(gmcp_data)
|
||||
|
||||
def encode_text(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode text output as raw ANSI in a BINARY frame.
|
||||
|
||||
Returns:
|
||||
tuple or None: (ansi_bytes, True) where True means BINARY frame.
|
||||
|
||||
"""
|
||||
extracted = self._extract_text_and_flags(args, kwargs, protocol_flags)
|
||||
if extracted is None:
|
||||
return None
|
||||
text, raw, nocolor, screenreader = extracted
|
||||
text = self._process_ansi(text, raw, nocolor, screenreader)
|
||||
return (text.encode("utf-8"), True)
|
||||
|
||||
def encode_prompt(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode a prompt.
|
||||
|
||||
For GMCP format, prompts are sent as BINARY frames (raw ANSI)
|
||||
just like regular text — the client can detect prompts via
|
||||
GMCP if needed.
|
||||
|
||||
Returns:
|
||||
tuple or None: (ansi_bytes, True) for BINARY frame.
|
||||
|
||||
"""
|
||||
return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs)
|
||||
|
||||
def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode an OOB command as a GMCP message in a TEXT frame.
|
||||
|
||||
Args:
|
||||
cmdname (str): The OOB command name.
|
||||
*args: Command arguments.
|
||||
protocol_flags (dict, optional): Not used.
|
||||
**kwargs: Command keyword arguments.
|
||||
|
||||
Returns:
|
||||
tuple or None: (gmcp_bytes, False) for TEXT frame, or None
|
||||
if cmdname is "options".
|
||||
|
||||
"""
|
||||
if cmdname == "options":
|
||||
return None
|
||||
|
||||
kwargs.pop("options", None)
|
||||
|
||||
gmcp_string = encode_gmcp(cmdname, *args, **kwargs)
|
||||
return (gmcp_string.encode("utf-8"), False)
|
||||
224
evennia/server/portal/wire_formats/json_standard.py
Normal file
224
evennia/server/portal/wire_formats/json_standard.py
Normal file
|
|
@ -0,0 +1,224 @@
|
|||
"""
|
||||
JSON MUD Standards wire format (json.mudstandards.org).
|
||||
|
||||
This implements the JSON subprotocol from the MUD Standards WebSocket
|
||||
proposal (https://mudstandards.org/websocket/).
|
||||
|
||||
Per the standard:
|
||||
- BINARY frames contain regular ANSI in- and output (UTF-8 encoded)
|
||||
- TEXT frames contain JSON payloads with the structure:
|
||||
{"proto": "<string>", "id": "<string>", "data": "<string>"}
|
||||
|
||||
This is the most flexible standard format, supporting GMCP, custom
|
||||
protocols, and any future structured data through the JSON envelope.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from evennia.server.portal.gmcp_utils import decode_gmcp, encode_gmcp
|
||||
|
||||
from .base import WireFormat
|
||||
|
||||
|
||||
class JsonStandardFormat(WireFormat):
|
||||
"""
|
||||
MUD Standards JSON envelope wire format.
|
||||
|
||||
Wire format:
|
||||
BINARY frames: Raw ANSI text (UTF-8), used for game text I/O.
|
||||
TEXT frames: JSON envelope {"proto", "id", "data"} for
|
||||
structured/OOB data.
|
||||
|
||||
Text handling:
|
||||
Outgoing text retains ANSI escape codes (no HTML conversion).
|
||||
Text is sent as BINARY frames.
|
||||
|
||||
OOB:
|
||||
Supported via TEXT frames. The "proto" field identifies the
|
||||
protocol (e.g., "gmcp"), "id" identifies the command, and
|
||||
"data" carries the JSON payload.
|
||||
"""
|
||||
|
||||
name = "json.mudstandards.org"
|
||||
supports_oob = True
|
||||
|
||||
def decode_incoming(self, payload, is_binary, protocol_flags=None):
|
||||
"""
|
||||
Decode incoming WebSocket message.
|
||||
|
||||
BINARY frames are treated as raw text input.
|
||||
TEXT frames are parsed as JSON envelopes.
|
||||
|
||||
Args:
|
||||
payload (bytes): The raw frame payload.
|
||||
is_binary (bool): True for BINARY frames, False for TEXT.
|
||||
protocol_flags (dict, optional): Not used.
|
||||
|
||||
Returns:
|
||||
dict or None: kwargs for data_in().
|
||||
|
||||
"""
|
||||
if is_binary:
|
||||
# BINARY frame = raw text input
|
||||
try:
|
||||
text = payload.decode("utf-8").strip()
|
||||
except UnicodeDecodeError:
|
||||
return None
|
||||
if not text:
|
||||
return None
|
||||
return {"text": [[text], {}]}
|
||||
else:
|
||||
# TEXT frame = JSON envelope
|
||||
try:
|
||||
envelope = json.loads(payload.decode("utf-8"))
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
return None
|
||||
|
||||
if not isinstance(envelope, dict):
|
||||
return None
|
||||
|
||||
proto = envelope.get("proto", "")
|
||||
cmd_id = envelope.get("id", "")
|
||||
data = envelope.get("data", "")
|
||||
|
||||
# Validate envelope field types — malformed envelopes are dropped
|
||||
if not isinstance(proto, str):
|
||||
return None
|
||||
if not isinstance(cmd_id, str):
|
||||
cmd_id = str(cmd_id) if cmd_id is not None else ""
|
||||
if not isinstance(data, str):
|
||||
try:
|
||||
data = json.dumps(data)
|
||||
except (TypeError, ValueError):
|
||||
data = str(data)
|
||||
|
||||
return self._decode_envelope(proto, cmd_id, data)
|
||||
|
||||
def _decode_envelope(self, proto, cmd_id, data):
|
||||
"""
|
||||
Decode a JSON envelope into Evennia inputfunc kwargs.
|
||||
|
||||
Args:
|
||||
proto (str): The protocol identifier (e.g., "gmcp", "text").
|
||||
cmd_id (str): The command identifier (e.g., GMCP package name).
|
||||
data (str): The payload string.
|
||||
|
||||
Returns:
|
||||
dict or None: kwargs for data_in().
|
||||
|
||||
"""
|
||||
if proto == "gmcp":
|
||||
# GMCP: id is the package name, data is the JSON payload
|
||||
cmd_id = cmd_id.strip()
|
||||
if not cmd_id:
|
||||
return None
|
||||
gmcp_string = "%s %s" % (cmd_id, data) if data else cmd_id
|
||||
return decode_gmcp(gmcp_string)
|
||||
|
||||
elif proto == "text":
|
||||
# Text input sent via JSON envelope
|
||||
return {"text": [[data], {}]}
|
||||
|
||||
elif proto == "websocket_close":
|
||||
return {"websocket_close": [[], {}]}
|
||||
|
||||
else:
|
||||
# Generic protocol — pass through as-is.
|
||||
# Prefer cmd_id as the inputfunc name, fall back to proto.
|
||||
try:
|
||||
parsed_data = json.loads(data) if data else {}
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
parsed_data = data
|
||||
|
||||
args = []
|
||||
kwargs = {}
|
||||
if isinstance(parsed_data, dict):
|
||||
kwargs = parsed_data
|
||||
elif isinstance(parsed_data, list):
|
||||
args = parsed_data
|
||||
else:
|
||||
args = [parsed_data]
|
||||
|
||||
funcname = cmd_id if cmd_id else proto
|
||||
if not funcname:
|
||||
return None
|
||||
return {funcname: [args, kwargs]}
|
||||
|
||||
def encode_text(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode text output as raw ANSI in a BINARY frame.
|
||||
|
||||
Returns:
|
||||
tuple or None: (ansi_bytes, True) where True means BINARY frame.
|
||||
|
||||
"""
|
||||
extracted = self._extract_text_and_flags(args, kwargs, protocol_flags)
|
||||
if extracted is None:
|
||||
return None
|
||||
text, raw, nocolor, screenreader = extracted
|
||||
text = self._process_ansi(text, raw, nocolor, screenreader)
|
||||
return (text.encode("utf-8"), True)
|
||||
|
||||
def encode_prompt(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode a prompt.
|
||||
|
||||
For the JSON standard format, prompts are sent as a JSON envelope
|
||||
in a TEXT frame with proto="prompt", allowing the client to
|
||||
distinguish prompts from regular text.
|
||||
|
||||
Returns:
|
||||
tuple or None: (json_bytes, False) for TEXT frame.
|
||||
|
||||
"""
|
||||
extracted = self._extract_text_and_flags(args, kwargs, protocol_flags)
|
||||
if extracted is None:
|
||||
return None
|
||||
text, raw, nocolor, screenreader = extracted
|
||||
text = self._process_ansi(text, raw, nocolor, screenreader)
|
||||
|
||||
envelope = {
|
||||
"proto": "prompt",
|
||||
"id": "",
|
||||
"data": text,
|
||||
}
|
||||
return (json.dumps(envelope).encode("utf-8"), False)
|
||||
|
||||
def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode an OOB command as a GMCP-in-JSON envelope.
|
||||
|
||||
OOB commands are sent as TEXT frames with the JSON envelope format.
|
||||
The command is translated to GMCP naming conventions and wrapped
|
||||
in a {"proto": "gmcp", "id": "Package.Name", "data": "..."} envelope.
|
||||
|
||||
Args:
|
||||
cmdname (str): The OOB command name.
|
||||
*args: Command arguments.
|
||||
protocol_flags (dict, optional): Not used.
|
||||
**kwargs: Command keyword arguments.
|
||||
|
||||
Returns:
|
||||
tuple or None: (json_bytes, False) for TEXT frame, or None
|
||||
if cmdname is "options".
|
||||
|
||||
"""
|
||||
if cmdname == "options":
|
||||
return None
|
||||
|
||||
kwargs.pop("options", None)
|
||||
|
||||
# Encode as GMCP string first, then wrap in JSON envelope
|
||||
gmcp_string = encode_gmcp(cmdname, *args, **kwargs)
|
||||
|
||||
# Split the GMCP string into package name and payload
|
||||
parts = gmcp_string.split(None, 1)
|
||||
gmcp_package = parts[0]
|
||||
gmcp_data = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
envelope = {
|
||||
"proto": "gmcp",
|
||||
"id": gmcp_package,
|
||||
"data": gmcp_data,
|
||||
}
|
||||
return (json.dumps(envelope).encode("utf-8"), False)
|
||||
103
evennia/server/portal/wire_formats/terminal.py
Normal file
103
evennia/server/portal/wire_formats/terminal.py
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
"""
|
||||
Terminal wire format (terminal.mudstandards.org).
|
||||
|
||||
This implements the simplest MUD Standards WebSocket subprotocol:
|
||||
raw ANSI/UTF-8 text in BINARY frames. No OOB support.
|
||||
|
||||
Per the MUD Standards proposal:
|
||||
"BINARY frames contain input/output and ANSI control codes.
|
||||
Encoded as UTF-8"
|
||||
|
||||
This format is suitable for basic terminal-style MUD clients that
|
||||
want raw ANSI output without any structured data channel.
|
||||
"""
|
||||
|
||||
from .base import WireFormat
|
||||
|
||||
|
||||
class TerminalFormat(WireFormat):
|
||||
"""
|
||||
Raw ANSI terminal wire format over BINARY WebSocket frames.
|
||||
|
||||
Wire format:
|
||||
All frames are BINARY, containing UTF-8 ANSI text.
|
||||
No TEXT frames are used.
|
||||
|
||||
Text handling:
|
||||
Outgoing text retains ANSI escape codes (no HTML conversion).
|
||||
ANSI is rendered by the client.
|
||||
|
||||
OOB:
|
||||
Not supported. This format has no structured data channel.
|
||||
"""
|
||||
|
||||
name = "terminal.mudstandards.org"
|
||||
supports_oob = False
|
||||
|
||||
def decode_incoming(self, payload, is_binary, protocol_flags=None):
|
||||
"""
|
||||
Decode incoming WebSocket frame as raw text input.
|
||||
|
||||
Both BINARY and TEXT frames are treated identically as UTF-8 text.
|
||||
|
||||
Args:
|
||||
payload (bytes): Raw UTF-8 text from the client.
|
||||
is_binary (bool): True for BINARY frames, False for TEXT.
|
||||
Both are handled identically.
|
||||
protocol_flags (dict, optional): Not used.
|
||||
|
||||
Returns:
|
||||
dict or None: {"text": [[text_string], {}]}
|
||||
|
||||
"""
|
||||
try:
|
||||
text = payload.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return None
|
||||
|
||||
text = text.strip()
|
||||
if not text:
|
||||
return None
|
||||
|
||||
return {"text": [[text], {}]}
|
||||
|
||||
def encode_text(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode text output as raw ANSI in a BINARY frame.
|
||||
|
||||
No HTML conversion is performed. ANSI color codes are preserved
|
||||
for the client to render.
|
||||
|
||||
Returns:
|
||||
tuple or None: (ansi_bytes, True) where True means BINARY frame.
|
||||
|
||||
"""
|
||||
extracted = self._extract_text_and_flags(args, kwargs, protocol_flags)
|
||||
if extracted is None:
|
||||
return None
|
||||
text, raw, nocolor, screenreader = extracted
|
||||
text = self._process_ansi(text, raw, nocolor, screenreader)
|
||||
return (text.encode("utf-8"), True)
|
||||
|
||||
def encode_prompt(self, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
Encode a prompt as raw ANSI.
|
||||
|
||||
For terminal mode, prompts are just text — there's no way
|
||||
to distinguish them from regular output at the wire level.
|
||||
|
||||
Returns:
|
||||
tuple or None: (ansi_bytes, True) for BINARY frame.
|
||||
|
||||
"""
|
||||
return self.encode_text(*args, protocol_flags=protocol_flags, **kwargs)
|
||||
|
||||
def encode_default(self, cmdname, *args, protocol_flags=None, **kwargs):
|
||||
"""
|
||||
OOB commands are not supported in terminal mode.
|
||||
|
||||
Returns:
|
||||
None: Always returns None (OOB data is silently dropped).
|
||||
|
||||
"""
|
||||
return None
|
||||
|
|
@ -121,6 +121,27 @@ WEBSOCKET_CLIENT_INTERFACE = "0.0.0.0"
|
|||
# the client will itself figure out this url based on the server's hostname.
|
||||
# e.g. ws://external.example.com or wss://external.example.com:443
|
||||
WEBSOCKET_CLIENT_URL = None
|
||||
# Ordered list of WebSocket subprotocols the server will accept during
|
||||
# RFC 6455 Sec-WebSocket-Protocol negotiation. The first protocol in this
|
||||
# list that matches a client's offered protocols will be selected.
|
||||
# Removing a subprotocol from this list disables it for clients that
|
||||
# explicitly negotiate via Sec-WebSocket-Protocol.
|
||||
# Set to None to accept all built-in formats. Set to [] to disable all
|
||||
# subprotocol negotiation.
|
||||
# Note: Evennia's built-in webclient explicitly sends
|
||||
# Sec-WebSocket-Protocol: v1.evennia.com and therefore requires
|
||||
# "v1.evennia.com" to remain in this list in order to connect successfully.
|
||||
# External/legacy third-party clients that do not send any
|
||||
# Sec-WebSocket-Protocol header are still accepted and always receive the
|
||||
# v1.evennia.com message format, regardless of this setting.
|
||||
# See https://mudstandards.org/websocket/ for details on the standard
|
||||
# subprotocols.
|
||||
WEBSOCKET_SUBPROTOCOLS = [
|
||||
"json.mudstandards.org",
|
||||
"gmcp.mudstandards.org",
|
||||
"terminal.mudstandards.org",
|
||||
"v1.evennia.com",
|
||||
]
|
||||
# This determine's whether Evennia's custom admin page is used, or if the
|
||||
# standard Django admin is used.
|
||||
EVENNIA_ADMIN = True
|
||||
|
|
|
|||
|
|
@ -228,7 +228,9 @@ An "emitter" object must have a function
|
|||
return;
|
||||
}
|
||||
// Important - we pass csessid tacked on the url
|
||||
websocket = new WebSocket(wsurl + '?' + csessid + '&' + cuid + '&' + browser);
|
||||
var wsquery = wsurl + '?' + csessid + '&' + cuid + '&' + browser;
|
||||
var protocols = ["v1.evennia.com"];
|
||||
websocket = new WebSocket(wsquery, protocols);
|
||||
|
||||
// Handle Websocket open event
|
||||
websocket.onopen = function (event) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue