Fixing lots of tests.

This commit is contained in:
Andrew Bastien 2023-11-22 15:26:07 -05:00
parent 15653ef1f1
commit 2dbd90d415
9 changed files with 214 additions and 192 deletions

View file

@ -113,7 +113,8 @@ OPTION_CLASSES = None
PROCESS_ID = None
TWISTED_APPLICATION = None
EVENNIA_SERVICE = None
EVENNIA_PORTAL_SERVICE = None
EVENNIA_SERVER_SERVICE = None
def _create_version():
@ -150,6 +151,7 @@ _LOADED = False
PORTAL_MODE = False
def _init(portal_mode=False):
"""
This function is called automatically by the launcher only after
@ -172,7 +174,7 @@ def _init(portal_mode=False):
global settings, lockfuncs, logger, utils, gametime, ansi, spawn, managers
global contrib, TICKER_HANDLER, MONITOR_HANDLER, SESSION_HANDLER, PROCESS_ID
global TASK_HANDLER, PORTAL_SESSION_HANDLER, SERVER_SESSION_HANDLER
global GLOBAL_SCRIPTS, OPTION_CLASSES, EVENNIA_SERVICE, TWISTED_APPLICATION
global GLOBAL_SCRIPTS, OPTION_CLASSES, EVENNIA_PORTAL_SERVICE, EVENNIA_SERVER_SERVICE, TWISTED_APPLICATION
global EvMenu, EvTable, EvForm, EvMore, EvEditor
global ANSIString, FuncParser
global AttributeProperty, TagProperty, TagCategoryProperty, ServerConfig
@ -247,6 +249,7 @@ def _init(portal_mode=False):
PROCESS_ID = os.getpid()
from twisted.application.service import Application
TWISTED_APPLICATION = Application("Evennia")
_evennia_service_class = None
@ -254,13 +257,17 @@ def _init(portal_mode=False):
if portal_mode:
# Set up the PortalSessionHandler
from evennia.server.portal import portalsessionhandler
portal_sess_handler_class = class_from_module(settings.PORTAL_SESSION_HANDLER_CLASS)
portalsessionhandler.PORTAL_SESSIONS = portal_sess_handler_class()
SESSION_HANDLER = portalsessionhandler.PORTAL_SESSIONS
evennia.PORTAL_SESSION_HANDLER = evennia.SESSION_HANDLER
_evennia_service_class = class_from_module(settings.EVENNIA_PORTAL_SERVICE_CLASS)
EVENNIA_PORTAL_SERVICE = _evennia_service_class()
EVENNIA_PORTAL_SERVICE.setServiceParent(TWISTED_APPLICATION)
from django.db import connection
# we don't need a connection to the database so close it right away
try:
connection.close()
@ -277,9 +284,8 @@ def _init(portal_mode=False):
SESSION_HANDLER = sessionhandler.SESSIONS
SERVER_SESSION_HANDLER = SESSION_HANDLER
_evennia_service_class = class_from_module(settings.EVENNIA_SERVER_SERVICE_CLASS)
EVENNIA_SERVICE = _evennia_service_class()
EVENNIA_SERVICE.setServiceParent(TWISTED_APPLICATION)
EVENNIA_SERVER_SERVICE = _evennia_service_class()
EVENNIA_SERVER_SERVICE.setServiceParent(TWISTED_APPLICATION)
# API containers

View file

@ -230,23 +230,23 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
# force a resync of sessions from the portal side. This happens on
# first server-connect.
server_restart_mode = kwargs.get("server_restart_mode", "shutdown")
evennia.EVENNIA_SERVICE.run_init_hooks(server_restart_mode)
evennia.EVENNIA_SERVER_SERVICE.run_init_hooks(server_restart_mode)
evennia.SERVER_SESSION_HANDLER.portal_sessions_sync(kwargs.get("sessiondata"))
evennia.SERVER_SESSION_HANDLER.portal_start_time = kwargs.get("portal_start_time")
elif operation == amp.SRELOAD: # server reload
# shut down in reload mode
evennia.SERVER_SESSION_HANDLER.all_sessions_portal_sync()
evennia.EVENNIA_SERVICE.shutdown(mode="reload")
evennia.EVENNIA_SERVER_SERVICE.shutdown(mode="reload")
elif operation == amp.SRESET:
# shut down in reset mode
evennia.SERVER_SESSION_HANDLER.all_sessions_portal_sync()
evennia.EVENNIA_SERVICE.shutdown(mode="reset")
evennia.EVENNIA_SERVER_SERVICE.shutdown(mode="reset")
elif operation == amp.SSHUTD: # server shutdown
# shutdown in stop mode
evennia.EVENNIA_SERVICE.shutdown(mode="shutdown")
evennia.EVENNIA_SERVER_SERVICE.shutdown(mode="shutdown")
else:
raise Exception("operation %(op)s not recognized." % {"op": operation})

View file

@ -174,6 +174,8 @@ def reset_server():
also checks so the warm-reset mechanism works as it should.
"""
if settings._TEST_ENVIRONMENT:
return
ServerConfig.objects.conf("server_epoch", time.time())
logger.log_info("Initial setup complete. Restarting Server once.")
@ -191,12 +193,6 @@ def handle_setup(last_step=None):
the function will exit immediately.
"""
if last_step in ("done", -1):
# this means we don't need to handle setup since
# it already ran sucessfully once. -1 is the legacy
# value for existing databases.
return
# setup sequence
setup_sequence = {
"create_objects": create_objects,
@ -205,6 +201,12 @@ def handle_setup(last_step=None):
"done": reset_server,
}
if last_step in ("done", -1):
# this means we don't need to handle setup since
# it already ran sucessfully once. -1 is the legacy
# value for existing databases.
return
# determine the sequence so we can skip ahead
steps = list(setup_sequence)
steps = steps[steps.index(last_step) + 1 if last_step is not None else 0 :]

View file

@ -132,7 +132,7 @@ class PortalSessionHandler(SessionHandler):
now = time.time()
if (
now - self.connection_last < _MIN_TIME_BETWEEN_CONNECTS
) or not evennia.EVENNIA_SERVICE.amp_protocol:
) or not evennia.EVENNIA_PORTAL_SERVICE.amp_protocol:
if not session or not self.connection_task:
self.connection_task = reactor.callLater(
_MIN_TIME_BETWEEN_CONNECTS, self.connect, None
@ -156,7 +156,7 @@ class PortalSessionHandler(SessionHandler):
self[session.sessid] = session
session.server_connected = True
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminPortal2Server(
evennia.EVENNIA_PORTAL_SERVICE.amp_protocol.send_AdminPortal2Server(
session, operation=PCONN, sessiondata=sessdata
)
@ -175,7 +175,7 @@ class PortalSessionHandler(SessionHandler):
# once to the server - if so we must re-sync woth the server, otherwise
# we skip this step.
sessdata = session.get_sync_data()
if evennia.EVENNIA_SERVICE.amp_protocol:
if evennia.EVENNIA_PORTAL_SERVICE.amp_protocol:
# we only send sessdata that should not have changed
# at the server level at this point
sessdata = dict(
@ -192,7 +192,7 @@ class PortalSessionHandler(SessionHandler):
"server_data",
)
)
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminPortal2Server(
evennia.EVENNIA_PORTAL_SERVICE.amp_protocol.send_AdminPortal2Server(
session, operation=PCONNSYNC, sessiondata=sessdata
)
@ -222,13 +222,17 @@ class PortalSessionHandler(SessionHandler):
del self[session.sessid]
# Tell the Server to disconnect its version of the Session as well.
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminPortal2Server(session, operation=PDISCONN)
evennia.EVENNIA_PORTAL_SERVICE.amp_protocol.send_AdminPortal2Server(
session, operation=PDISCONN
)
def disconnect_all(self):
"""
Disconnect all sessions, informing the Server.
"""
if settings._TEST_ENVIRONMENT:
return
def _callback(result, sessionhandler):
# we set a watchdog to stop self.disconnect from deleting
@ -240,7 +244,8 @@ class PortalSessionHandler(SessionHandler):
# inform Server; wait until finished sending before we continue
# removing all the sessions.
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminPortal2Server(
evennia.EVENNIA_PORTAL_SERVICE.amp_protocol.send_AdminPortal2Server(
DUMMYSESSION, operation=PDISCONNALL
).addCallback(_callback, self)
@ -434,7 +439,7 @@ class PortalSessionHandler(SessionHandler):
self.data_out(session, text=[[_ERROR_COMMAND_OVERFLOW], {}])
return
if not evennia.EVENNIA_SERVICE.amp_protocol:
if not evennia.EVENNIA_PORTAL_SERVICE.amp_protocol:
# this can happen if someone connects before AMP connection
# was established (usually on first start)
reactor.callLater(1.0, self.data_in, session, **kwargs)
@ -445,7 +450,7 @@ class PortalSessionHandler(SessionHandler):
# relay data to Server
session.cmd_last = now
evennia.EVENNIA_SERVICE.amp_protocol.send_MsgPortal2Server(session, **kwargs)
evennia.EVENNIA_PORTAL_SERVICE.amp_protocol.send_MsgPortal2Server(session, **kwargs)
# eventual local echo (text input only)
if "text" in kwargs and session.protocol_flags.get("LOCALECHO", False):

View file

@ -24,6 +24,8 @@ from twisted.trial.unittest import TestCase as TwistedTestCase
import evennia
from evennia.server.portal import irc
from evennia.utils.test_resources import BaseEvenniaTest
from evennia.server.portal.service import EvenniaPortalService
from evennia.server.portal.portalsessionhandler import PortalSessionHandler
from .amp import (
AMP_MAXLEN,
@ -221,8 +223,13 @@ class TestIRC(TestCase):
class TestTelnet(TwistedTestCase):
def setUp(self):
super().setUp()
self.portal = EvenniaPortalService()
evennia.EVENNIA_PORTAL_SERVICE = self.portal
self.amp_server_factory = AMPServerFactory(self.portal)
self.amp_server = self.amp_server_factory.buildProtocol("127.0.0.1")
factory = TelnetServerFactory()
factory.protocol = TelnetProtocol
evennia.PORTAL_SESSION_HANDLER = PortalSessionHandler()
factory.sessionhandler = evennia.PORTAL_SESSION_HANDLER
factory.sessionhandler.portal = Mock()
self.proto = factory.buildProtocol(("localhost", 0))
@ -287,8 +294,13 @@ class TestTelnet(TwistedTestCase):
class TestWebSocket(BaseEvenniaTest):
def setUp(self):
super().setUp()
self.portal = EvenniaPortalService()
evennia.EVENNIA_PORTAL_SERVICE = self.portal
self.amp_server_factory = AMPServerFactory(self.portal)
self.amp_server = self.amp_server_factory.buildProtocol("127.0.0.1")
self.proto = WebSocketClient()
self.proto.factory = WebSocketServerFactory()
evennia.PORTAL_SESSION_HANDLER = PortalSessionHandler()
self.proto.factory.sessionhandler = evennia.PORTAL_SESSION_HANDLER
self.proto.sessionhandler = evennia.PORTAL_SESSION_HANDLER
self.proto.sessionhandler.portal = Mock()

View file

@ -26,9 +26,7 @@ _SA = object.__setattr__
class EvenniaServerService(MultiService):
def _wrap_sigint_handler(self, *args):
if hasattr(self, "web_root"):
d = self.web_root.empty_threadpool()
d.addCallback(lambda _: self.shutdown("reload", _reactor_stopping=True))
@ -41,6 +39,7 @@ class EvenniaServerService(MultiService):
super().__init__(*args, **kwargs)
self.maintenance_count = 0
self.amp_protocol = None # set by amp factory
self.amp_service = None
self.info_dict = {
"servername": settings.SERVERNAME,
"version": get_evennia_version(),
@ -72,8 +71,6 @@ class EvenniaServerService(MultiService):
if isinstance(mod, str)
]
# Server startup methods
def server_maintenance(self):
"""
This maintenance function handles repeated checks and updates that
@ -81,6 +78,7 @@ class EvenniaServerService(MultiService):
"""
if not self._flush_cache:
from evennia.utils.idmapper.models import conditional_flush as _FLUSH_CACHE
self._flush_cache = _FLUSH_CACHE
self.maintenance_count += 1
@ -89,7 +87,9 @@ class EvenniaServerService(MultiService):
if self.maintenance_count == 1:
# first call after a reload
evennia.gametime.SERVER_START_TIME = now
evennia.gametime.SERVER_RUNTIME = evennia.ServerConfig.objects.conf("runtime", default=0.0)
evennia.gametime.SERVER_RUNTIME = evennia.ServerConfig.objects.conf(
"runtime", default=0.0
)
_LAST_SERVER_TIME_SNAPSHOT = now
else:
# adjust the runtime not with 60s but with the actual elapsed time
@ -109,20 +109,7 @@ class EvenniaServerService(MultiService):
# (see https://github.com/evennia/evennia/issues/1376)
connection.close()
# handle idle timeouts
if settings.IDLE_TIMEOUT > 0:
reason = _("idle timeout exceeded")
to_disconnect = []
for session in (
sess for sess in evennia.SESSION_HANDLER.values() if (now - sess.cmd_last) > settings.IDLE_TIMEOUT
):
if not session.account or not session.account.access(
session.account, "noidletimeout", default=False
):
to_disconnect.append(session)
for session in to_disconnect:
evennia.SESSION_HANDLER.disconnect(session, reason=reason)
self.process_idle_timeouts()
# run unpuppet hooks for objects that are marked as being puppeted,
# but which lacks an account (indicates a broken unpuppet operation
@ -138,6 +125,26 @@ class EvenniaServerService(MultiService):
if unpuppet_count:
logger.log_msg(f"Ran unpuppet-hooks for {unpuppet_count} link-dead puppets.")
def process_idle_timeouts(self):
# handle idle timeouts
if settings.IDLE_TIMEOUT > 0:
now = time.time()
reason = _("idle timeout exceeded")
to_disconnect = []
for session in (
sess
for sess in evennia.SESSION_HANDLER.values()
if (now - sess.cmd_last) > settings.IDLE_TIMEOUT
):
if not session.account or not session.account.access(
session.account, "noidletimeout", default=False
):
to_disconnect.append(session)
for session in to_disconnect:
evennia.SESSION_HANDLER.disconnect(session, reason=reason)
# Server startup methods
def privilegedStartService(self):
self.start_time = time.time()
@ -209,9 +216,9 @@ class EvenniaServerService(MultiService):
from evennia.server import amp_client
factory = amp_client.AMPClientFactory(self)
amp_service = internet.TCPClient(settings.AMP_HOST, settings.AMP_PORT, factory)
amp_service.setName("ServerAMPClient")
amp_service.setServiceParent(self)
self.amp_service = internet.TCPClient(settings.AMP_HOST, settings.AMP_PORT, factory)
self.amp_service.setName("ServerAMPClient")
self.amp_service.setServiceParent(self)
def register_webserver(self):
# Start a django-compatible webserver.
@ -269,12 +276,12 @@ class EvenniaServerService(MultiService):
can't save it to the database.
"""
if (
".".join(str(i) for i in django.VERSION) < "1.2"
and settings.DATABASES.get("default", {}).get("ENGINE") == "sqlite3"
".".join(str(i) for i in django.VERSION) < "1.2"
and settings.DATABASES.get("default", {}).get("ENGINE") == "sqlite3"
) or (
hasattr(settings, "DATABASES")
and settings.DATABASES.get("default", {}).get("ENGINE", None)
== "django.db.backends.sqlite3"
hasattr(settings, "DATABASES")
and settings.DATABASES.get("default", {}).get("ENGINE", None)
== "django.db.backends.sqlite3"
):
cursor = connection.cursor()
cursor.execute("PRAGMA cache_size=10000")
@ -291,7 +298,6 @@ class EvenniaServerService(MultiService):
already existing objects.
"""
# setting names
settings_names = (
@ -316,14 +322,14 @@ class EvenniaServerService(MultiService):
i for i, tup in enumerate(settings_compare) if tup[0] and tup[1] and tup[0] != tup[1]
]
if len(
mismatches
mismatches
): # can't use any() since mismatches may be [0] which reads as False for any()
# we have a changed default. Import relevant objects and
# run the update
# from evennia.accounts.models import AccountDB
for i, prev, curr in (
(i, tup[0], tup[1]) for i, tup in enumerate(settings_compare) if i in mismatches
(i, tup[0], tup[1]) for i, tup in enumerate(settings_compare) if i in mismatches
):
# update the database
self.info_dict[
@ -380,7 +386,7 @@ class EvenniaServerService(MultiService):
Once finished the last_initial_setup_step is set to 'done'
"""
initial_setup = importlib.import_module(settings.INITIAL_SETUP_MODULE)
last_initial_setup_step = evennia.ServerConfig.objects.conf("last_initial_setup_step")
try:
@ -401,8 +407,9 @@ class EvenniaServerService(MultiService):
except Exception:
# stop server if this happens.
print(traceback.format_exc())
print("Error in initial setup. Stopping Server + Portal.")
evennia.SESSION_HANDLER.portal_shutdown()
if not settings._TEST_ENVIRONMENT or not evennia.SESSION_HANDLER:
print("Error in initial setup. Stopping Server + Portal.")
evennia.SESSION_HANDLER.portal_shutdown()
def create_default_channels(self):
"""
@ -424,7 +431,7 @@ class EvenniaServerService(MultiService):
# connectinfo
connectinfo_chan = settings.CHANNEL_CONNECTINFO
if connectinfo_chan and not ChannelDB.objects.filter(
db_key__iexact=connectinfo_chan["key"]
db_key__iexact=connectinfo_chan["key"]
):
channel = create_channel(**connectinfo_chan)
# default channels
@ -523,7 +530,10 @@ class EvenniaServerService(MultiService):
if self.amp_protocol:
yield evennia.SESSION_HANDLER.all_sessions_portal_sync()
else: # shutdown
yield [_SA(p, "is_connected", False) for p in evennia.AccountDB.get_all_cached_instances()]
yield [
_SA(p, "is_connected", False)
for p in evennia.AccountDB.get_all_cached_instances()
]
yield [o.at_server_shutdown() for o in evennia.ObjectDB.get_all_cached_instances()]
yield [
(p.unpuppet_all(), p.at_server_shutdown())
@ -575,7 +585,7 @@ class EvenniaServerService(MultiService):
"""
for mod in self.start_stop_modules:
if (hook := getattr(mod, hookname, None)):
if hook := getattr(mod, hookname, None):
hook()
def at_server_init(self):
@ -667,7 +677,7 @@ class EvenniaServerService(MultiService):
if settings.GUEST_ENABLED:
for guest in evennia.AccountDB.objects.all().filter(
db_typeclass_path=settings.BASE_GUEST_TYPECLASS
db_typeclass_path=settings.BASE_GUEST_TYPECLASS
):
for character in guest.db._playable_characters:
if character:

View file

@ -307,7 +307,7 @@ class ServerSessionHandler(SessionHandler):
"""
super().__init__(*args, **kwargs)
evennia.EVENNIA_SERVICE_data = {"servername": _SERVERNAME}
evennia.server_data = {"servername": _SERVERNAME}
# will be set on psync
self.portal_start_time = 0.0
@ -411,7 +411,7 @@ class ServerSessionHandler(SessionHandler):
mode = "reload"
# tell the server hook we synced
evennia.EVENNIA_SERVICE.at_post_portal_sync(mode)
evennia.EVENNIA_SERVER_SERVICE.at_post_portal_sync(mode)
# announce the reconnection
if _BROADCAST_SERVER_RESTART_MESSAGES:
self.announce_all(_(" ... Server restarted."))
@ -467,7 +467,7 @@ class ServerSessionHandler(SessionHandler):
the Server.
"""
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SCONN, protocol_path=protocol_path, config=configdict
)
@ -476,14 +476,18 @@ class ServerSessionHandler(SessionHandler):
Called by server when reloading. We tell the portal to start a new server instance.
"""
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(DUMMYSESSION, operation=amp.SRELOAD)
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SRELOAD
)
def portal_reset_server(self):
"""
Called by server when reloading. We tell the portal to start a new server instance.
"""
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(DUMMYSESSION, operation=amp.SRESET)
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SRESET
)
def portal_shutdown(self):
"""
@ -491,7 +495,9 @@ class ServerSessionHandler(SessionHandler):
itself down)
"""
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(DUMMYSESSION, operation=amp.PSHUTD)
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.PSHUTD
)
def login(self, session, account, force=False, testmode=False):
"""
@ -537,7 +543,7 @@ class ServerSessionHandler(SessionHandler):
session.logged_in = True
# sync the portal to the session
if not testmode:
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
session, operation=amp.SLOGIN, sessiondata={"logged_in": True, "uid": session.uid}
)
account.at_post_login(session=session)
@ -582,7 +588,7 @@ class ServerSessionHandler(SessionHandler):
del self[sessid]
if sync_portal:
# inform portal that session should be closed.
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
session, operation=amp.SDISCONN, reason=reason
)
@ -593,7 +599,7 @@ class ServerSessionHandler(SessionHandler):
"""
sessdata = self.get_all_sync_data()
return evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
return evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SSYNC, sessiondata=sessdata
)
@ -604,7 +610,7 @@ class ServerSessionHandler(SessionHandler):
"""
sessdata = {session.sessid: session.get_sync_data()}
return evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
return evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SSYNC, sessiondata=sessdata, clean=False
)
@ -617,7 +623,7 @@ class ServerSessionHandler(SessionHandler):
more sessions in detail.
"""
return evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
return evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SSYNC, sessiondata=session_data, clean=False
)
@ -633,7 +639,7 @@ class ServerSessionHandler(SessionHandler):
for session in self:
del session
# tell portal to disconnect all sessions
evennia.EVENNIA_SERVICE.amp_protocol.send_AdminServer2Portal(
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_AdminServer2Portal(
DUMMYSESSION, operation=amp.SDISCONNALL, reason=reason
)
@ -817,7 +823,7 @@ class ServerSessionHandler(SessionHandler):
kwargs = self.clean_senddata(session, kwargs)
# send across AMP
evennia.EVENNIA_SERVICE.amp_protocol.send_MsgServer2Portal(session, **kwargs)
evennia.EVENNIA_SERVER_SERVICE.amp_protocol.send_MsgServer2Portal(session, **kwargs)
def get_inputfuncs(self):
"""

View file

@ -78,7 +78,7 @@ class _TestAMP(TwistedTestCase):
return all_sent
@patch("evennia.server.server.LoopingCall", MagicMock())
@patch("evennia.server.service.LoopingCall", MagicMock())
@patch("evennia.server.portal.amp.amp.BinaryBoxProtocol.transport")
class TestAMPClientSend(_TestAMP):
"""Test amp client sending data"""
@ -90,7 +90,9 @@ class TestAMPClientSend(_TestAMP):
self._connect_server(mocktransport)
self.amp_server.dataReceived(wire_data)
self.portal.sessions.data_out.assert_called_with(self.portalsession, text={"foo": "bar"})
evennia.PORTAL_SESSION_HANDLER.data_out.assert_called_with(
self.portalsession, text={"foo": "bar"}
)
def test_adminserver2portal(self, mocktransport):
self._connect_client(mocktransport)
@ -117,7 +119,7 @@ class TestAMPClientRecv(_TestAMP):
self._connect_client(mocktransport)
self.amp_client.dataReceived(wire_data)
self.server.sessions.data_in.assert_called_with(self.session, text={"foo": "bar"})
evennia.SERVER_SESSION_HANDLER.data_in.assert_called_with(self.session, text={"foo": "bar"})
def test_adminportal2server(self, mocktransport):
self._connect_server(mocktransport)
@ -126,6 +128,6 @@ class TestAMPClientRecv(_TestAMP):
wire_data = self._catch_wire_read(mocktransport)[0]
self._connect_client(mocktransport)
self.server.sessions.portal_disconnect_all = MagicMock()
evennia.SERVER_SESSION_HANDLER.portal_disconnect_all = MagicMock()
self.amp_client.dataReceived(wire_data)
self.server.sessions.portal_disconnect_all.assert_called()
evennia.SERVER_SESSION_HANDLER.portal_disconnect_all.assert_called()

View file

@ -2,14 +2,14 @@
Test the main server component
"""
import evennia
from unittest import TestCase
from django.test import override_settings
from mock import DEFAULT, MagicMock, call, patch
@patch("evennia.server.server.LoopingCall", new=MagicMock())
@patch("evennia.server.service.LoopingCall", new=MagicMock())
class TestServer(TestCase):
"""
Test server module.
@ -17,78 +17,76 @@ class TestServer(TestCase):
"""
def setUp(self):
# Running this first line ensures that the EVENNIA_SERVICE is instantiated.
from evennia.server import server
self.server = server
self.server = evennia.EVENNIA_SERVER_SERVICE
@override_settings(IDMAPPER_CACHE_MAXSIZE=1000)
def test__server_maintenance_reset(self):
with patch.multiple(
"evennia.server.server",
with patch.object(self.server, "_flush_cache", new=MagicMock()) as mockflush, patch.object(
evennia, "ServerConfig", new=MagicMock()
) as mockconf, patch.multiple(
"evennia.server.service",
LoopingCall=DEFAULT,
Evennia=DEFAULT,
_FLUSH_CACHE=DEFAULT,
connection=DEFAULT,
_IDMAPPER_CACHE_MAXSIZE=1000,
_MAINTENANCE_COUNT=0,
ServerConfig=DEFAULT,
) as mocks:
self.server.maintenance_count = 0
mocks["connection"].close = MagicMock()
mocks["ServerConfig"].objects.conf = MagicMock(return_value=456)
mockconf.objects.conf = MagicMock(return_value=456)
# flush cache
self.server._server_maintenance()
mocks["ServerConfig"].objects.conf.assert_called_with("runtime", 456)
self.server.server_maintenance()
mockconf.objects.conf.assert_called_with("runtime", 456)
@override_settings(IDMAPPER_CACHE_MAXSIZE=1000)
def test__server_maintenance_flush(self):
with patch.multiple(
"evennia.server.server",
"evennia.server.service",
LoopingCall=DEFAULT,
Evennia=DEFAULT,
_FLUSH_CACHE=DEFAULT,
connection=DEFAULT,
_IDMAPPER_CACHE_MAXSIZE=1000,
_MAINTENANCE_COUNT=5 - 1,
ServerConfig=DEFAULT,
) as mocks:
) as mocks, patch.object(
evennia, "ServerConfig", new=MagicMock()
) as mockconf, patch.object(
self.server, "_flush_cache", new=MagicMock()
) as mockflush:
mocks["connection"].close = MagicMock()
mocks["ServerConfig"].objects.conf = MagicMock(return_value=100)
mockconf.objects.conf = MagicMock(return_value=100)
self.server.maintenance_count = 5 - 1
# flush cache
self.server._server_maintenance()
mocks["_FLUSH_CACHE"].assert_called_with(1000)
self.server.server_maintenance()
self.server._flush_cache.assert_called_with(1000)
@override_settings(IDMAPPER_CACHE_MAXSIZE=1000)
def test__server_maintenance_close_connection(self):
with patch.multiple(
"evennia.server.server",
"evennia.server.service",
LoopingCall=DEFAULT,
Evennia=DEFAULT,
_FLUSH_CACHE=DEFAULT,
connection=DEFAULT,
_IDMAPPER_CACHE_MAXSIZE=1000,
_MAINTENANCE_COUNT=(60 * 7) - 1,
_LAST_SERVER_TIME_SNAPSHOT=0,
ServerConfig=DEFAULT,
) as mocks:
) as mocks, patch.object(evennia, "ServerConfig", new=MagicMock()) as mockconf:
self.server._flush_cache = MagicMock()
self.server.maintenance_count = (60 * 7) - 1
self.server._last_server_time_snapshot = 0
mocks["connection"].close = MagicMock()
mocks["ServerConfig"].objects.conf = MagicMock(return_value=100)
self.server._server_maintenance()
mockconf.objects.conf = MagicMock(return_value=100)
self.server.server_maintenance()
mocks["connection"].close.assert_called()
@override_settings(IDLE_TIMEOUT=10)
def test__server_maintenance_idle_time(self):
with patch.multiple(
"evennia.server.server",
"evennia.server.service",
LoopingCall=DEFAULT,
Evennia=DEFAULT,
_FLUSH_CACHE=DEFAULT,
connection=DEFAULT,
_IDMAPPER_CACHE_MAXSIZE=1000,
_MAINTENANCE_COUNT=(3600 * 7) - 1,
_LAST_SERVER_TIME_SNAPSHOT=0,
SESSIONS=DEFAULT,
_IDLE_TIMEOUT=10,
time=DEFAULT,
ServerConfig=DEFAULT,
) as mocks:
) as mocks, patch.object(
evennia, "ServerConfig", new=MagicMock()
) as mockconf, patch.object(
evennia, "SESSION_HANDLER", new=MagicMock()
) as mocksess:
self.server.maintenance_count = (3600 * 7) - 1
self.server._last_server_time_snapshot = 0
sess1 = MagicMock()
sess2 = MagicMock()
sess3 = MagicMock()
@ -105,31 +103,25 @@ class TestServer(TestCase):
mocks["time"].time = MagicMock(return_value=1000)
mocks["ServerConfig"].objects.conf = MagicMock(return_value=100)
mocks["SESSIONS"].values = MagicMock(return_value=[sess1, sess2, sess3, sess4])
mocks["SESSIONS"].disconnect = MagicMock()
mockconf.objects.conf = MagicMock(return_value=100)
mocksess.values = MagicMock(return_value=[sess1, sess2, sess3, sess4])
mocksess.disconnect = MagicMock()
self.server._server_maintenance()
self.server.server_maintenance()
reason = "idle timeout exceeded"
calls = [call(sess1, reason=reason), call(sess4, reason=reason)]
mocks["SESSIONS"].disconnect.assert_has_calls(calls, any_order=True)
mocksess.disconnect.assert_has_calls(calls, any_order=True)
def test_evennia_start(self):
with patch.multiple("evennia.server.server", time=DEFAULT, service=DEFAULT) as mocks:
mocks["time"].time = MagicMock(return_value=1000)
evennia = self.server.Evennia(MagicMock())
self.assertEqual(evennia.start_time, 1000)
@patch("evennia.objects.models.ObjectDB")
@patch("evennia.server.server.AccountDB")
@patch("evennia.server.server.ScriptDB")
@patch("evennia.comms.models.ChannelDB")
def test_update_defaults(self, mockchan, mockscript, mockacct, mockobj):
with patch.multiple("evennia.server.server", ServerConfig=DEFAULT) as mocks:
mockchan.objects.filter = MagicMock()
mockscript.objects.filter = MagicMock()
mockacct.objects.filter = MagicMock()
mockobj.objects.filter = MagicMock()
def test_update_defaults(self):
with patch.object(evennia, "ObjectDB", new=MagicMock()) as mockobj, patch.object(
evennia, "AccountDB", new=MagicMock()
) as mockacc, patch.object(evennia, "ScriptDB", new=MagicMock()) as mockscr, patch.object(
evennia, "ChannelDB", new=MagicMock()
) as mockchan, patch.object(
evennia, "ServerConfig", new=MagicMock()
) as mockconf:
for m in (mockscr, mockobj, mockacc, mockchan):
m.objects.filter = MagicMock()
# fake mismatches
settings_names = (
@ -148,16 +140,14 @@ class TestServer(TestCase):
def _mock_conf(key, *args):
return fakes[key]
mocks["ServerConfig"].objects.conf = _mock_conf
mockconf.objects.conf = _mock_conf
evennia = self.server.Evennia(MagicMock())
evennia.update_defaults()
self.server.update_defaults()
mockchan.objects.filter.assert_called()
mockscript.objects.filter.assert_called()
mockacct.objects.filter.assert_called()
mockobj.objects.filter.assert_called()
for m in (mockscr, mockobj, mockacc, mockchan):
m.objects.filter.assert_called()
@override_settings(_TEST_ENVIRONMENT=True)
def test_initial_setup(self):
from evennia.utils.create import create_account
@ -167,10 +157,10 @@ class TestServer(TestCase):
"evennia.server.initial_setup", reset_server=DEFAULT, AccountDB=DEFAULT
) as mocks:
mocks["AccountDB"].objects.get = MagicMock(return_value=acct)
evennia = self.server.Evennia(MagicMock())
evennia.run_initial_setup()
self.server.run_initial_setup()
acct.delete()
@override_settings(_TEST_ENVIRONMENT=True)
def test_initial_setup_retry(self):
from evennia.utils.create import create_account
@ -185,14 +175,12 @@ class TestServer(TestCase):
mocks["AccountDB"].objects.get = MagicMock(return_value=acct)
# a last_initial_setup_step > 0
mocks["ServerConfig"].objects.conf = MagicMock(return_value=4)
evennia = self.server.Evennia(MagicMock())
evennia.run_initial_setup()
self.server.run_initial_setup()
acct.delete()
@patch("evennia.server.server.INFO_DICT", {"test": "foo"})
def test_get_info_dict(self):
evennia = self.server.Evennia(MagicMock())
self.assertEqual(evennia.get_info_dict(), {"test": "foo"})
with patch.object(self.server, "get_info_dict", return_value={"test": "foo"}) as mocks:
self.assertEqual(self.server.get_info_dict(), {"test": "foo"})
class TestInitHooks(TestCase):
@ -200,7 +188,7 @@ class TestInitHooks(TestCase):
from evennia.server import server
from evennia.utils import create
self.server = server
self.server = evennia.EVENNIA_SERVER_SERVICE
self.obj1 = create.object(key="HookTestObj1")
self.obj2 = create.object(key="HookTestObj2")
@ -211,44 +199,35 @@ class TestInitHooks(TestCase):
self.script1 = create.script(key="script1")
self.script2 = create.script(key="script2")
self.obj1.at_init = MagicMock()
self.obj2.at_init = MagicMock()
self.acct1.at_init = MagicMock()
self.acct2.at_init = MagicMock()
self.chan1.at_init = MagicMock()
self.chan2.at_init = MagicMock()
self.script1.at_init = MagicMock()
self.script2.at_init = MagicMock()
self.objects = [
self.obj1,
self.obj2,
self.acct1,
self.acct2,
self.chan1,
self.chan2,
self.script1,
self.script2,
]
for obj in self.objects:
obj.at_init = MagicMock()
def tearDown(self):
self.obj1.delete()
self.obj2.delete()
self.acct1.delete()
self.acct2.delete()
self.chan1.delete()
self.chan2.delete()
self.script1.delete()
self.script2.delete()
for obj in self.objects:
obj.delete()
@override_settings(_TEST_ENVIRONMENT=True)
def test_run_init_hooks(self):
evennia = self.server.Evennia(MagicMock())
with patch.object(
self.server, "at_server_reload_start", new=MagicMock()
) as reload, patch.object(self.server, "at_server_cold_start", new=MagicMock()) as cold:
self.server.run_init_hooks("reload")
self.server.run_init_hooks("reset")
self.server.run_init_hooks("shutdown")
evennia.at_server_reload_start = MagicMock()
evennia.at_server_cold_start = MagicMock()
for obj in self.objects:
obj.at_init.assert_called()
evennia.run_init_hooks("reload")
evennia.run_init_hooks("reset")
evennia.run_init_hooks("shutdown")
self.acct1.at_init.assert_called()
self.acct2.at_init.assert_called()
self.obj1.at_init.assert_called()
self.obj2.at_init.assert_called()
self.chan1.at_init.assert_called()
self.chan2.at_init.assert_called()
self.script1.at_init.assert_called()
self.script2.at_init.assert_called()
evennia.at_server_reload_start.assert_called()
evennia.at_server_cold_start.assert_called()
for hook in (reload, cold):
hook.assert_called()