Refactored AMP protocol and made it more efficient for use with compression and batch-sending.

This commit is contained in:
Griatch 2015-09-21 16:47:24 +02:00
parent 4109c86545
commit a5ef023769

View file

@ -53,12 +53,6 @@ _SENDBATCH = defaultdict(list)
_MSGBUFFER = defaultdict(list)
import zlib
#_ZLIB_FLUSH = zlib.Z_SYNC_FLUSH
#_ZLIB_COMP = zlib.compressobj(9)
#_ZLIB_DECOMP = zlib.decompressobj()
#_ZLIB_COMPRESS = lambda data: _ZLIB_COMP.compress(data) + _ZLIB_COMP.flush(_ZLIB_FLUSH)
#_ZLIB_DECOMPRESS = lambda data: _ZLIB_DECOMP.decompress(data)
def get_restart_mode(restart_file):
"""
@ -201,15 +195,17 @@ class AmpClientFactory(protocol.ReconnectingClientFactory):
class Compressed(amp.String):
"""
This is an Argument that both handles too-long sends as well as
uses zlib for compression across the wire. Much of this is
borrowed from ~glyph/+junk/amphacks/mediumbox.
This is a customn AMP command Argument that both handles too-long
sends as well as uses zlib for compression across the wire. The
batch-grouping of too-long sends is borrowed from the "mediumbox"
recipy at twisted-hacks's ~glyph/+junk/amphacks/mediumbox.
"""
def fromBox(self, name, strings, objects, proto):
"""
Converts from box representation to python.
Converts from box representation to python. We
group very long data into batches.
"""
value = StringIO()
value.write(strings.get(name))
@ -223,7 +219,8 @@ class Compressed(amp.String):
def toBox(self, name, strings, objects, proto):
"""
Convert from data to box.
Convert from data to box. We handled too-long
batched data and put it together here.
"""
value = StringIO(objects[name])
strings[name] = value.read(AMP_MAXLEN)
@ -235,13 +232,13 @@ class Compressed(amp.String):
def toString(self, inObject):
"""
Convert to send on the wire.
Convert to send on the wire, with compression.
"""
return zlib.compress(inObject, 9)
def fromString(self, inString):
"""
Convert from the wire to Python.
Convert (decompress) from the wire to Python.
"""
return zlib.decompress(inString)
@ -254,7 +251,7 @@ class MsgPortal2Server(amp.Command):
key = "MsgPortal2Server"
arguments = [('data', Compressed())]
errors = [(Exception, 'EXCEPTION')]
response = [('timing', amp.String())]
response = []
class MsgServer2Portal(amp.Command):
@ -265,11 +262,10 @@ class MsgServer2Portal(amp.Command):
key = "MsgServer2Portal"
arguments = [('data', Compressed())]
errors = [(Exception, 'EXCEPTION')]
#response = []
response = [('timing', amp.String())]
response = []
class ServerAdmin(amp.Command):
class AdminPortal2Server(amp.Command):
"""
Administration Portal -> Server
@ -277,14 +273,13 @@ class ServerAdmin(amp.Command):
server, such as when a new session connects or resyncs
"""
key = "ServerAdmin"
key = "AdminPortal2Server"
arguments = [('data', Compressed())]
errors = [(Exception, 'EXCEPTION')]
#response = []
response = [('timing', amp.String())]
response = []
class PortalAdmin(amp.Command):
class AdminServer2Portal(amp.Command):
"""
Administration Server -> Portal
@ -292,11 +287,10 @@ class PortalAdmin(amp.Command):
portal.
"""
key = "PortalAdmin"
key = "AdminServer2Portal"
arguments = [('data', Compressed())]
errors = [(Exception, 'EXCEPTION')]
#response = [()]
response = [('timing', amp.String())]
response = []
class FunctionCall(amp.Command):
@ -353,18 +347,19 @@ class AMPProtocol(amp.AMP):
def connectionMade(self):
"""
This is called when a connection is established between server
and portal. AMP calls it on both sides, so we need to make
sure to only trigger resync from the portal side.
This is called when an AMP connection is (re-)established
between server and portal. AMP calls it on both sides, so we
need to make sure to only trigger resync from the portal side.
"""
# this makes for a factor x10 faster sends!
# this makes for a factor x10 faster sends across the wire
self.transport.setTcpNoDelay(True)
if hasattr(self.factory, "portal"):
# only the portal has the 'portal' property, so we know we are
# on the portal side and can initialize the connection.
sessdata = self.factory.portal.sessions.get_all_sync_data()
self.call_remote_ServerAdmin(0,
self.send_AdminPortal2Server(0,
PSYNC,
data=sessdata)
self.factory.portal.sessions.at_server_connection()
@ -386,10 +381,8 @@ class AMPProtocol(amp.AMP):
e.trap(Exception)
print "AMP Error for %(info)s: %(e)s" % {'info': info,
'e': e.getErrorMessage()}
def callback(self, ret, info):
print "AMP return timing (%s): %f" % (info, time() - float(ret["timing"]))
def batch_send(self, command, sessid, **kwargs):
def send_data(self, command, sessid, **kwargs):
"""
This will batch data together to send fewer, large batches.
@ -397,107 +390,40 @@ class AMPProtocol(amp.AMP):
command (AMP Command): A protocol send command.
sessid (int): A unique Session id.
Kwargs:
force_direct (bool): Send direct, without batching data.
Returns:
deferreds (list or None): A list of deferreds firing with
as batch parts get sent (or fails).
deferred (deferred or None): A deferred with an errback.
Notes:
Data will be sent across the wire pickled as a tuple
(sessid, kwargs).
"""
global _SENDBATCH
if command:
# always put AMP command in cache
_SENDBATCH[command].append((sessid, kwargs))
self.send_batch_counter += 1
force_direct = kwargs.pop("force_direct", False)
now = time()
if force_direct:
# check the current command rate to determine if we
# can return send mode or not. We add 1 to counter
# to avoid cases when it happens to be 0.
self.send_mode = (((self.send_batch_counter + 1) /
(now - self.send_reset_time)) <= (BATCH_RATE*BATCH_TIMEOUT))
self.send_batch_counter = 0
self.send_reset_time = now
if not (self.send_mode and self.send_task):
self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True)
else:
self.send_task = None
elif self.send_mode and self.send_batch_counter > BATCH_RATE:
# we have reached the batch count. How long this took
# defines if we should halt sending or not.
self.send_mode = now - self.send_reset_time >= 1.0
#print "BATCH_RATE:", BATCH_RATE / (now - self.send_reset_time)
self.send_batch_counter = 0
self.send_reset_time = now
if not (self.send_mode and self.send_task):
force_direct = True # make sure to empty cache
self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True)
if self.send_mode or force_direct:
sendsize = 0
sendcount = 0
for command, cmdlist in _SENDBATCH.items():
batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples.
# We pack the data in a string-form pickle.
sendsize += sys.getsizeof(batch)
sendcount += 1
del _SENDBATCH[command]
deferred = self.callRemote(command,
data=batch).addErrback(self.errback, command.key + " " + repr(cmdlist))
print " sent %i batches: %fkB." % (sendcount, sendsize / 1000.0)
return deferred
def batch_recv(self, data):
"""
This will receive and unpack data sent as a batch. This both
handles too-long data as well as batch-sending very fast-
arriving commands.
Args:
data (str): Data coming over the wire.
Returns:
data (str or list): The received data.
"""
# most common case
return loads(data)
batch = dumps((sessid, kwargs))
return self.callRemote(command,
data=batch).addErrback(self.errback, command.key)
# Message definition + helper methods to call/create each message type
# Portal -> Server Msg
def amp_msg_portal2server(self, data):
@MsgPortal2Server.responder
def server_receive_msgportal2server(self, data):
"""
Relays message to server. This method is executed on the
Server.
Since AMP has a limit of 65355 bytes per message, it's
possible the data comes in multiple chunks; if so (nparts>1)
we buffer the data and wait for the remaining parts to arrive
before continuing.
Receives message arriving to server. This method is executed
on the Server.
Args:
data (str): Data to send (often a part of a batch)
data (str): Data to receive (a pickled tuple (sessid,kwargs))
"""
batch = self.batch_recv(data)
for (sessid, kwargs) in batch:
#print "msg portal -> server (server side):", sessid, msg, loads(ret["data"])
from evennia.server.profiling.timetrace import timetrace
kwargs["msg"] = timetrace(kwargs["msg"], "AMP.amp_msg_portal2server")
self.factory.server.sessions.data_in(sessid,
sessid, kwargs = loads(data)
#print "msg portal -> server (server side):", sessid, msg, loads(ret["data"])
self.factory.server.sessions.data_in(sessid,
text=kwargs["msg"],
data=kwargs["data"])
return {"timing":"%f" % time()}
MsgPortal2Server.responder(amp_msg_portal2server)
return {}
def call_remote_MsgPortal2Server(self, sessid, msg, data=""):
def send_MsgPortal2Server(self, sessid, msg="", data=""):
"""
Access method called by the Portal and executed on the Portal.
@ -511,17 +437,17 @@ class AMPProtocol(amp.AMP):
"""
#print "msg portal->server (portal side):", sessid, msg, data
from evennia.server.profiling.timetrace import timetrace
msg = timetrace(msg, "AMP.call_remote_MsgPortal2Server")
return self.batch_send(MsgPortal2Server, sessid,
msg=msg if msg is not None else "",
data=data)
return self.send_data(MsgPortal2Server, sessid,
msg=msg,
data=data)
# Server -> Portal message
def amp_msg_server2portal(self, data):
@MsgServer2Portal.responder
def portal_receive_server2portal(self, data):
"""
Relays message to Portal. This method is executed on the Portal.
Receives message arriving to Portal from Server.
This method is executed on the Portal.
Since AMP has a limit of 65355 bytes per message, it's
possible the data comes in multiple chunks; if so (nparts>1)
@ -529,48 +455,20 @@ class AMPProtocol(amp.AMP):
before continuing.
Args:
data (str): Data to send (often a part of a batch)
data (str): Pickled data (sessid, kwargs) coming over the wire.
"""
batch = self.batch_recv(data)
for (sessid, kwargs) in batch:
#print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"])
from evennia.server.profiling.timetrace import timetrace
kwargs["msg"] = timetrace(kwargs["msg"], "AMP.amp_msg_server2portal")
self.factory.portal.sessions.data_out(sessid,
text=kwargs["msg"],
data=kwargs["data"])
#return {}
return {"timing":"%f" % time()}
MsgServer2Portal.responder(amp_msg_server2portal)
sessid, kwargs = loads(data)
#print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"])
self.factory.portal.sessions.data_out(sessid,
text=kwargs["msg"],
data=kwargs["data"])
return {}
def amp_batch_server2portal(self, data):
def send_MsgServer2Portal(self, sessid, msg="", data=""):
"""
Relays batch data to Portal. This method is executed on the Portal.
Since AMP has a limit of 65355 bytes per message, it's
possible the data comes in multiple chunks; if so (nparts>1)
we buffer the data and wait for the remaining parts to arrive
before continuing.
Args:
data (str): Data to send (often a part of a batch)
"""
batch = self.batch_recv(data)
if batch is not None:
for (sessid, kwargs) in batch:
from evennia.server.profiling.timetrace import timetrace
kwargs["msg"] = timetrace(kwargs["msg"], "AMP.amp_batch_server2portal")
self.factory.portal.sessions.data_out(sessid,
text=kwargs["msg"],
**kwargs["data"])
#return {}
return {"timing":"%f" % time()}
MsgServer2Portal.responder(amp_batch_server2portal)
def call_remote_MsgServer2Portal(self, sessid, msg="", data=""):
"""
Send Message - access method called by the Server and executed on the Server.
Access method - executed on the Server for sending data
to Portal.
Args:
sessid (int): Unique Session id.
@ -578,63 +476,56 @@ class AMPProtocol(amp.AMP):
data (str, optional): Extra data.
"""
from evennia.server.profiling.timetrace import timetrace
msg = timetrace(msg, "AMP.call_remote_MsgServer2Portal")
#print "msg server->portal (server side):", sessid, msg, data
return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data)
return self.send_data(MsgServer2Portal, sessid, msg=msg, data=data)
# Server administration from the Portal side
def amp_server_admin(self, data):
@AdminPortal2Server.responder
def server_receive_adminportal2server(self, data):
"""
This allows the portal to perform admin
operations on the server. This is executed on the Server.
Since AMP has a limit of 65355 bytes per message, it's
possible the data comes in multiple chunks; if so (nparts>1)
we buffer the data and wait for the remaining parts to arrive
before continuing.
Receives admin data from the Portal (allows the portal to
perform admin operations on the server). This is executed on
the Server.
Args:
data (str): Data to send (often a part of a batch)
"""
#print "serveradmin (server side):", hashid, ipart, nparts
batch = self.batch_recv(data)
sessid, kwargs = loads(data)
for (sessid, kwargs) in batch:
operation = kwargs["operation"]
data = kwargs["data"]
server_sessionhandler = self.factory.server.sessions
operation = kwargs["operation"]
data = kwargs["data"]
server_sessionhandler = self.factory.server.sessions
#print "serveradmin (server side):", sessid, ord(operation), data
#print "serveradmin (server side):", sessid, ord(operation), data
if operation == PCONN: # portal_session_connect
# create a new session and sync it
server_sessionhandler.portal_connect(data)
if operation == PCONN: # portal_session_connect
# create a new session and sync it
server_sessionhandler.portal_connect(data)
elif operation == PCONNSYNC: #portal_session_sync
server_sessionhandler.portal_session_sync(data)
elif operation == PCONNSYNC: #portal_session_sync
server_sessionhandler.portal_session_sync(data)
elif operation == PDISCONN: # portal_session_disconnect
# session closed from portal side
self.factory.server.sessions.portal_disconnect(sessid)
elif operation == PDISCONN: # portal_session_disconnect
# session closed from portal side
self.factory.server.sessions.portal_disconnect(sessid)
elif operation == PSYNC: # portal_session_sync
# force a resync of sessions when portal reconnects to
# server (e.g. after a server reboot) the data kwarg
# contains a dict {sessid: {arg1:val1,...}}
# representing the attributes to sync for each
# session.
server_sessionhandler.portal_sessions_sync(data)
else:
raise Exception("operation %(op)s not recognized." % {'op': operation})
#return {}
return {"timing":"%f" % time()}
ServerAdmin.responder(amp_server_admin)
elif operation == PSYNC: # portal_session_sync
# force a resync of sessions when portal reconnects to
# server (e.g. after a server reboot) the data kwarg
# contains a dict {sessid: {arg1:val1,...}}
# representing the attributes to sync for each
# session.
server_sessionhandler.portal_sessions_sync(data)
else:
raise Exception("operation %(op)s not recognized." % {'op': operation})
return {}
def call_remote_ServerAdmin(self, sessid, operation="", data=""):
def send_AdminPortal2Server(self, sessid, operation="", data=""):
"""
Administrative access method called by the Portal and Executed
Send Admin instructions from the Portal to the Server.
Executed
on the Portal.
Args:
@ -645,69 +536,61 @@ class AMPProtocol(amp.AMP):
"""
#print "serveradmin (portal side):", sessid, ord(operation), data
if hasattr(self.factory, "server_restart_mode"):
return self.batch_send(ServerAdmin, sessid, force_direct=True, operation=operation, data=data)
return self.batch_send(ServerAdmin, sessid, operation=operation, data=data)
return self.send_data(AdminPortal2Server, sessid, operation=operation, data=data)
# Portal administraton from the Server side
def amp_portal_admin(self, data):
@AdminServer2Portal.responder
def portal_receive_adminserver2portal(self, data):
"""
This allows the server to perform admin
operations on the portal. This is executed on the Portal.
Since AMP has a limit of 65355 bytes per message, it's
possible the data comes in multiple chunks; if so (nparts>1)
we buffer the data and wait for the remaining parts to arrive
before continuing.
Receives and handles admin operations sent to the Portal
This is executed on the Portal.
Args:
data (str): Data to send (often a part of a batch)
data (str): Data received, a pickled tuple (sessid, kwargs).
"""
#print "portaladmin (portal side):", sessid, ord(operation), data
batch = self.batch_recv(data)
for (sessid, kwargs) in batch:
operation = kwargs["operation"]
data = kwargs["data"]
portal_sessionhandler = self.factory.portal.sessions
sessid, kwargs = loads(data)
operation = kwargs["operation"]
data = kwargs["data"]
portal_sessionhandler = self.factory.portal.sessions
if operation == SLOGIN: # server_session_login
# a session has authenticated; sync it.
portal_sessionhandler.server_logged_in(sessid, data)
if operation == SLOGIN: # server_session_login
# a session has authenticated; sync it.
portal_sessionhandler.server_logged_in(sessid, data)
elif operation == SDISCONN: # server_session_disconnect
# the server is ordering to disconnect the session
portal_sessionhandler.server_disconnect(sessid, reason=data)
elif operation == SDISCONN: # server_session_disconnect
# the server is ordering to disconnect the session
portal_sessionhandler.server_disconnect(sessid, reason=data)
elif operation == SDISCONNALL: # server_session_disconnect_all
# server orders all sessions to disconnect
portal_sessionhandler.server_disconnect_all(reason=data)
elif operation == SDISCONNALL: # server_session_disconnect_all
# server orders all sessions to disconnect
portal_sessionhandler.server_disconnect_all(reason=data)
elif operation == SSHUTD: # server_shutdown
# the server orders the portal to shut down
self.factory.portal.shutdown(restart=False)
elif operation == SSHUTD: # server_shutdown
# the server orders the portal to shut down
self.factory.portal.shutdown(restart=False)
elif operation == SSYNC: # server_session_sync
# server wants to save session data to the portal,
# maybe because it's about to shut down.
portal_sessionhandler.server_session_sync(data)
# set a flag in case we are about to shut down soon
self.factory.server_restart_mode = True
elif operation == SSYNC: # server_session_sync
# server wants to save session data to the portal,
# maybe because it's about to shut down.
portal_sessionhandler.server_session_sync(data)
# set a flag in case we are about to shut down soon
self.factory.server_restart_mode = True
elif operation == SCONN: # server_force_connection (for irc/imc2 etc)
portal_sessionhandler.server_connect(**data)
elif operation == SCONN: # server_force_connection (for irc/imc2 etc)
portal_sessionhandler.server_connect(**data)
else:
raise Exception("operation %(op)s not recognized." % {'op': operation})
#return {}
return {"timing":"%f" % time()}
PortalAdmin.responder(amp_portal_admin)
else:
raise Exception("operation %(op)s not recognized." % {'op': operation})
return {}
def call_remote_PortalAdmin(self, sessid, operation="", data=""):
def send_AdminServer2Portal(self, sessid, operation="", data=""):
"""
Administrative access method called by the Server side and executed
onthe Portal.
Administrative access method called by the Server to send an
instruction to the Portal.
Args:
sessid (int): Session id.
@ -718,13 +601,12 @@ class AMPProtocol(amp.AMP):
operation.
"""
if operation == SSYNC:
return self.batch_send(PortalAdmin, sessid, force_direct=True, operation=operation, data=data)
return self.batch_send(PortalAdmin, sessid, operation=operation, data=data)
return self.send_data(AdminServer2Portal, sessid, operation=operation, data=data)
# Extra functions
def amp_function_call(self, module, function, args, **kwargs):
@FunctionCall.responder
def receive_functioncall(self, module, function, args, **kwargs):
"""
This allows Portal- and Server-process to call an arbitrary
function in the other process. It is intended for use by
@ -752,9 +634,8 @@ class AMPProtocol(amp.AMP):
return result
else:
return {'result': dumps(result)}
FunctionCall.responder(amp_function_call)
def call_remote_FunctionCall(self, modulepath, functionname, *args, **kwargs):
def send_FunctionCall(self, modulepath, functionname, *args, **kwargs):
"""
Access method called by either process. This will call an arbitrary
function on the other process (On Portal if calling from Server and