Convert the telnet protocols over for Py3.

This commit is contained in:
Ryan Stein 2017-10-29 21:43:32 -04:00
parent 1da3e0caa0
commit b88c74a316
8 changed files with 63 additions and 55 deletions

View file

@ -18,7 +18,7 @@ from builtins import object
import zlib
# negotiations for v1 and v2 of the protocol
MCCP = chr(86)
MCCP = b'\x56'
FLUSH = zlib.Z_SYNC_FLUSH
@ -85,6 +85,6 @@ class Mccp(object):
"""
self.protocol.protocol_flags['MCCP'] = True
self.protocol.requestNegotiation(MCCP, '')
self.protocol.requestNegotiation(MCCP, b'')
self.protocol.zlib = zlib.compressobj(9)
self.protocol.handshake_done()

View file

@ -14,9 +14,9 @@ from builtins import object
from django.conf import settings
from evennia.utils import utils
MSSP = chr(70)
MSSP_VAR = chr(1)
MSSP_VAL = chr(2)
MSSP = b'\x46'
MSSP_VAR = b'\x01'
MSSP_VAL = b'\x02'
# try to get the customized mssp info, if it exists.

View file

@ -18,7 +18,9 @@ import re
LINKS_SUB = re.compile(r'\|lc(.*?)\|lt(.*?)\|le', re.DOTALL)
MXP = chr(91)
# MXP Telnet option
MXP = b'\x5b'
MXP_TEMPSECURE = "\x1B[4z"
MXP_SEND = MXP_TEMPSECURE + \
"<SEND HREF=\"\\1\">" + \
@ -84,5 +86,5 @@ class Mxp(object):
"""
self.protocol.protocol_flags["MXP"] = True
self.protocol.requestNegotiation(MXP, '')
self.protocol.requestNegotiation(MXP, b'')
self.protocol.handshake_done()

View file

@ -9,11 +9,12 @@ NAWS allows telnet clients to report their current window size to the
client and update it when the size changes
"""
from codecs import encode as codecs_encode
from builtins import object
from django.conf import settings
NAWS = chr(31)
IS = chr(0)
NAWS = b'\x1f'
IS = b'\x00'
# default taken from telnet specification
DEFAULT_WIDTH = settings.CLIENT_DEFAULT_WIDTH
DEFAULT_HEIGHT = settings.CLIENT_DEFAULT_HEIGHT
@ -76,6 +77,6 @@ class Naws(object):
if len(options) == 4:
# NAWS is negotiated with 16bit words
width = options[0] + options[1]
self.protocol.protocol_flags['SCREENWIDTH'][0] = int(width.encode('hex'), 16)
self.protocol.protocol_flags['SCREENWIDTH'][0] = int(codecs_encode(width, 'hex'), 16)
height = options[2] + options[3]
self.protocol.protocol_flags['SCREENHEIGHT'][0] = int(height.encode('hex'), 16)
self.protocol.protocol_flags['SCREENHEIGHT'][0] = int(codecs_encode(height, 'hex'), 16)

View file

@ -14,7 +14,7 @@ http://www.faqs.org/rfcs/rfc858.html
"""
from builtins import object
SUPPRESS_GA = chr(3)
SUPPRESS_GA = b'\x03'
# default taken from telnet specification

View file

@ -20,10 +20,10 @@ from evennia.utils import ansi
from evennia.utils.utils import to_str
_RE_N = re.compile(r"\|n$")
_RE_LEND = re.compile(r"\n$|\r$|\r\n$|\r\x00$|", re.MULTILINE)
_RE_LINEBREAK = re.compile(r"\n\r|\r\n|\n|\r", re.DOTALL + re.MULTILINE)
_RE_LEND = re.compile(br"\n$|\r$|\r\n$|\r\x00$|", re.MULTILINE)
_RE_LINEBREAK = re.compile(br"\n\r|\r\n|\n|\r", re.DOTALL + re.MULTILINE)
_RE_SCREENREADER_REGEX = re.compile(r"%s" % settings.SCREENREADER_REGEX_STRIP, re.DOTALL + re.MULTILINE)
_IDLE_COMMAND = settings.IDLE_COMMAND + "\n"
_IDLE_COMMAND = str.encode(settings.IDLE_COMMAND + "\n")
class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
@ -43,7 +43,7 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
"""
# initialize the session
self.line_buffer = ""
self.line_buffer = b""
client_address = self.transport.client
client_address = client_address[0] if client_address else None
# this number is counted down for every handshake that completes.
@ -208,18 +208,18 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
if self.line_buffer and len(data) > 1:
# buffer exists, it is terminated by the first line feed
data[0] = self.line_buffer + data[0]
self.line_buffer = ""
self.line_buffer = b""
# if the last data split is empty, it means all splits have
# line breaks, if not, it is unterminated and must be
# buffered.
self.line_buffer += data.pop()
# send all data chunks
for dat in data:
self.data_in(text=dat + "\n")
self.data_in(text=dat + b"\n")
def _write(self, data):
"""hook overloading the one used in plain telnet"""
data = data.replace('\n', '\r\n').replace('\r\r\n', '\r\n')
data = data.replace(b'\n', b'\r\n').replace(b'\r\r\n', b'\r\n')
super(TelnetProtocol, self)._write(mccp_compress(self, data))
def sendLine(self, line):
@ -231,8 +231,9 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
"""
# escape IAC in line mode, and correctly add \r\n
line = line.encode()
line += self.delimiter
line = line.replace(IAC, IAC + IAC).replace('\n', '\r\n')
line = line.replace(IAC, IAC + IAC).replace(b'\n', b'\r\n')
if not self.protocol_flags.get("NOGOAHEAD", True):
line += IAC + GA
return self.transport.write(mccp_compress(self, line))
@ -327,7 +328,8 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
strip_ansi=nocolor, xterm256=xterm256)
if mxp:
prompt = mxp_parse(prompt)
prompt = prompt.replace(IAC, IAC + IAC).replace('\n', '\r\n')
prompt = prompt.encode()
prompt = prompt.replace(IAC, IAC + IAC).replace(b'\n', b'\r\n')
prompt += IAC + GA
self.transport.write(mccp_compress(self, prompt))
else:

View file

@ -28,24 +28,22 @@ header where applicable.
from builtins import object
import re
import json
from evennia.utils.utils import to_str
from evennia.utils.utils import to_str, is_iter
# MSDP-relevant telnet cmd/opt-codes
MSDP = chr(69)
MSDP_VAR = chr(1) # ^A
MSDP_VAL = chr(2) # ^B
MSDP_TABLE_OPEN = chr(3) # ^C
MSDP_TABLE_CLOSE = chr(4) # ^D
MSDP_ARRAY_OPEN = chr(5) # ^E
MSDP_ARRAY_CLOSE = chr(6) # ^F
MSDP = b'\x45'
MSDP_VAR = b'\x01' # ^A
MSDP_VAL = b'\x02' # ^B
MSDP_TABLE_OPEN = b'\x03' # ^C
MSDP_TABLE_CLOSE = b'\x04' # ^D
MSDP_ARRAY_OPEN = b'\x05' # ^E
MSDP_ARRAY_CLOSE = b'\x06' # ^F
# GMCP
GMCP = chr(201)
GMCP = b'\xc9'
# General Telnet
IAC = chr(255)
SB = chr(250)
SE = chr(240)
from twisted.conch.telnet import IAC, SB, SE
def force_str(inp):
@ -55,17 +53,17 @@ def force_str(inp):
# pre-compiled regexes
# returns 2-tuple
msdp_regex_table = re.compile(r"%s\s*(\w*?)\s*%s\s*%s(.*?)%s"
msdp_regex_table = re.compile(br"%s\s*(\w*?)\s*%s\s*%s(.*?)%s"
% (MSDP_VAR, MSDP_VAL,
MSDP_TABLE_OPEN,
MSDP_TABLE_CLOSE))
# returns 2-tuple
msdp_regex_array = re.compile(r"%s\s*(\w*?)\s*%s\s*%s(.*?)%s"
msdp_regex_array = re.compile(br"%s\s*(\w*?)\s*%s\s*%s(.*?)%s"
% (MSDP_VAR, MSDP_VAL,
MSDP_ARRAY_OPEN,
MSDP_ARRAY_CLOSE))
msdp_regex_var = re.compile(r"%s" % MSDP_VAR)
msdp_regex_val = re.compile(r"%s" % MSDP_VAL)
msdp_regex_var = re.compile(br"%s" % MSDP_VAR)
msdp_regex_val = re.compile(br"%s" % MSDP_VAL)
EVENNIA_TO_GMCP = {"client_options": "Core.Supports.Get",
"get_inputfuncs": "Core.Commands.Get",
@ -178,7 +176,7 @@ class TelnetOOB(object):
msdp_var=MSDP_VAR, msdp_cmdname=cmdname, msdp_val=MSDP_VAL)
if not (args or kwargs):
return msdp_cmdname
return msdp_cmdname.encode()
# print("encode_msdp in:", cmdname, args, kwargs) # DEBUG
@ -213,7 +211,7 @@ class TelnetOOB(object):
msdp_string = msdp_args + msdp_kwargs
# print("msdp_string:", msdp_string) # DEBUG
return msdp_string
return msdp_string.encode()
def encode_gmcp(self, cmdname, *args, **kwargs):
"""
@ -249,7 +247,7 @@ class TelnetOOB(object):
gmcp_string = "%s %s" % (cmdname, json.dumps(kwargs))
# print("gmcp string", gmcp_string) # DEBUG
return gmcp_string
return gmcp_string.encode()
def decode_msdp(self, data):
"""
@ -275,8 +273,8 @@ class TelnetOOB(object):
identified as separate cmdnames.
"""
if hasattr(data, "__iter__"):
data = "".join(data)
if isinstance(data, list):
data = b"".join(data)
# print("decode_msdp in:", data) # DEBUG
@ -286,29 +284,34 @@ class TelnetOOB(object):
# decode tables
for key, table in msdp_regex_table.findall(data):
key = key.decode()
tables[key] = {} if key not in tables else tables[key]
for varval in msdp_regex_var.split(table)[1:]:
var, val = msdp_regex_val.split(varval, 1)
var, val = var.decode(), val.decode()
if var:
tables[key][var] = val
# decode arrays from all that was not a table
data_no_tables = msdp_regex_table.sub("", data)
data_no_tables = msdp_regex_table.sub(b"", data)
for key, array in msdp_regex_array.findall(data_no_tables):
key = key.decode()
arrays[key] = [] if key not in arrays else arrays[key]
parts = msdp_regex_val.split(array)
parts = [part.decode() for part in parts]
if len(parts) == 2:
arrays[key].append(parts[1])
elif len(parts) > 1:
arrays[key].extend(parts[1:])
# decode remainders from all that were not tables or arrays
data_no_tables_or_arrays = msdp_regex_array.sub("", data_no_tables)
data_no_tables_or_arrays = msdp_regex_array.sub(b"", data_no_tables)
for varval in msdp_regex_var.split(data_no_tables_or_arrays):
# get remaining varvals after cleaning away tables/arrays. If mathcing
# an existing key in arrays, it will be added as an argument to that command,
# otherwise it will be treated as a command without argument.
parts = msdp_regex_val.split(varval)
parts = [part.decode() for part in parts]
if len(parts) == 2:
variables[parts[0]] = parts[1]
elif len(parts) > 1:
@ -356,33 +359,33 @@ class TelnetOOB(object):
Core.Name [[args], {kwargs}] -> [name, [args], {kwargs}]
"""
if hasattr(data, "__iter__"):
data = "".join(data)
if isinstance(data, list):
data = b"".join(data)
# print("decode_gmcp in:", data) # DEBUG
if data:
try:
cmdname, structure = data.split(None, 1)
except ValueError:
cmdname, structure = data, ""
cmdname = cmdname.replace(".", "_")
cmdname, structure = data, b""
cmdname = cmdname.replace(b".", b"_")
try:
structure = json.loads(structure)
except ValueError:
# maybe the structure is not json-serialized at all
pass
args, kwargs = [], {}
if hasattr(structure, "__iter__"):
if is_iter(structure):
if isinstance(structure, dict):
kwargs = {key: value for key, value in structure.items() if key}
else:
args = list(structure)
else:
args = (structure,)
if cmdname.lower().startswith("core_"):
if cmdname.lower().startswith(b"core_"):
# if Core.cmdname, then use cmdname
cmdname = cmdname[5:]
self.protocol.data_in(**{cmdname.lower(): [args, kwargs]})
self.protocol.data_in(**{cmdname.lower().decode(): [args, kwargs]})
# access methods

View file

@ -13,9 +13,9 @@ under the 'TTYPE' key.
from builtins import object
# telnet option codes
TTYPE = chr(24)
IS = chr(0)
SEND = chr(1)
TTYPE = b'\x18'
IS = b'\x00'
SEND = b'\x01'
# terminal capabilities and their codes
MTTS = [(128, 'PROXY'),
@ -89,7 +89,7 @@ class Ttype(object):
return
try:
option = "".join(option).lstrip(IS)
option = b"".join(option).lstrip(IS).decode()
except TypeError:
# option is not on a suitable form for joining
pass