diff --git a/evennia/server/portal/telnet.py b/evennia/server/portal/telnet.py index 955ea5e918..83e2fa03a2 100644 --- a/evennia/server/portal/telnet.py +++ b/evennia/server/portal/telnet.py @@ -84,7 +84,7 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session): from evennia.utils.utils import delay # timeout the handshakes in case the client doesn't reply at all - delay(2, callback=self.handshake_done, timeout=True) + self._handshake_delay = delay(2, callback=self.handshake_done, timeout=True) # TCP/IP keepalive watches for dead links self.transport.setTcpKeepAlive(1) diff --git a/evennia/server/portal/tests.py b/evennia/server/portal/tests.py index be400144c6..791e5172a4 100644 --- a/evennia/server/portal/tests.py +++ b/evennia/server/portal/tests.py @@ -8,9 +8,24 @@ try: except ImportError: import unittest +from mock import Mock import string from evennia.server.portal import irc +from twisted.conch.telnet import IAC, WILL, DONT, SB, SE, NAWS, DO +from twisted.test import proto_helpers +from twisted.trial.unittest import TestCase as TwistedTestCase + +from .telnet import TelnetServerFactory, TelnetProtocol +from .portal import PORTAL_SESSIONS +from .suppress_ga import SUPPRESS_GA +from .naws import DEFAULT_HEIGHT, DEFAULT_WIDTH +from .ttype import TTYPE, IS +from .mccp import MCCP +from .mssp import MSSP +from .mxp import MXP +from .telnet_oob import MSDP, MSDP_VAL, MSDP_VAR + class TestIRC(TestCase): @@ -73,3 +88,64 @@ class TestIRC(TestCase): s = r'|wthis|Xis|gis|Ma|C|complex|*string' self.assertEqual(irc.parse_irc_to_ansi(irc.parse_ansi_to_irc(s)), s) + + +class TestTelnet(TwistedTestCase): + def setUp(self): + super(TestTelnet, self).setUp() + factory = TelnetServerFactory() + factory.protocol = TelnetProtocol + factory.sessionhandler = PORTAL_SESSIONS + factory.sessionhandler.portal = Mock() + self.proto = factory.buildProtocol(("localhost", 0)) + self.transport = proto_helpers.StringTransport() + self.addCleanup(factory.sessionhandler.disconnect_all) + + def test_mudlet_ttype(self): + self.transport.client = ["localhost"] + self.transport.setTcpKeepAlive = Mock() + d = self.proto.makeConnection(self.transport) + # test suppress_ga + self.assertTrue(self.proto.protocol_flags["NOGOAHEAD"]) + self.proto.dataReceived(IAC + DONT + SUPPRESS_GA) + self.assertFalse(self.proto.protocol_flags["NOGOAHEAD"]) + self.assertEqual(self.proto.handshakes, 7) + # test naws + self.assertEqual(self.proto.protocol_flags['SCREENWIDTH'], {0: DEFAULT_WIDTH}) + self.assertEqual(self.proto.protocol_flags['SCREENHEIGHT'], {0: DEFAULT_HEIGHT}) + self.proto.dataReceived(IAC + WILL + NAWS) + self.proto.dataReceived([IAC, SB, NAWS, '', 'x', '', 'd', IAC, SE]) + self.assertEqual(self.proto.protocol_flags['SCREENWIDTH'][0], 120) + self.assertEqual(self.proto.protocol_flags['SCREENHEIGHT'][0], 100) + self.assertEqual(self.proto.handshakes, 6) + # test ttype + self.assertTrue(self.proto.protocol_flags["FORCEDENDLINE"]) + self.assertFalse(self.proto.protocol_flags["TTYPE"]) + self.assertTrue(self.proto.protocol_flags["ANSI"]) + self.proto.dataReceived(IAC + WILL + TTYPE) + self.proto.dataReceived([IAC, SB, TTYPE, IS, "MUDLET", IAC, SE]) + self.assertTrue(self.proto.protocol_flags["XTERM256"]) + self.assertEqual(self.proto.protocol_flags["CLIENTNAME"], "MUDLET") + self.proto.dataReceived([IAC, SB, TTYPE, IS, "XTERM", IAC, SE]) + self.proto.dataReceived([IAC, SB, TTYPE, IS, "MTTS 137", IAC, SE]) + self.assertEqual(self.proto.handshakes, 5) + # test mccp + self.proto.dataReceived(IAC + DONT + MCCP) + self.assertFalse(self.proto.protocol_flags['MCCP']) + self.assertEqual(self.proto.handshakes, 4) + # test mssp + self.proto.dataReceived(IAC + DONT + MSSP) + self.assertEqual(self.proto.handshakes, 3) + # test oob + self.proto.dataReceived(IAC + DO + MSDP) + self.proto.dataReceived([IAC, SB, MSDP, MSDP_VAR, "LIST", MSDP_VAL, "COMMANDS", IAC, SE]) + self.assertTrue(self.proto.protocol_flags['OOB']) + self.assertEqual(self.proto.handshakes, 2) + # test mxp + self.proto.dataReceived(IAC + DONT + MXP) + self.assertFalse(self.proto.protocol_flags['MXP']) + self.assertEqual(self.proto.handshakes, 1) + # clean up to prevent Unclean reactor + self.proto.nop_keep_alive.stop() + self.proto._handshake_delay.cancel() + return d