Starting to rework the AMP handler to use batch-sending. This seems like a very promising optimization technique.

This commit is contained in:
Griatch 2015-04-02 00:14:28 +02:00
parent 938b7553e2
commit e63922ff19

View file

@ -25,7 +25,7 @@ try:
except ImportError:
import pickle
from twisted.protocols import amp
from twisted.internet import protocol, reactor
from twisted.internet import protocol, task
from twisted.internet.defer import Deferred
from evennia.utils.utils import to_str, variable_from_module
@ -41,8 +41,13 @@ SSHUTD = chr(7) # server shutdown
SSYNC = chr(8) # server session sync
SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc)
PCONNSYNC = chr(10) # portal post-syncing a session
AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed)
MAXLEN = 65535 # max allowed data length in AMP protocol
BATCH_RATE = 500 # max commands/sec before switching to batch-sending
BATCH_TIMEOUT = 1.0 # how often to poll to empty batch queue, in seconds
# buffers
_SENDBATCH = defaultdict(list)
_MSGBUFFER = defaultdict(list)
def get_restart_mode(restart_file):
@ -140,11 +145,10 @@ class MsgPortal2Server(amp.Command):
Message portal -> server
"""
key = "MsgPortal2Server"
arguments = [('sessid', amp.Integer()),
arguments = [('hashid', amp.Integer()),
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer()),
('msg', amp.String()),
('data', amp.String())]
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')]
response = []
@ -154,23 +158,6 @@ class MsgServer2Portal(amp.Command):
Message server -> portal
"""
key = "MsgServer2Portal"
arguments = [('sessid', amp.Integer()),
('ipart', amp.Integer()),
('nparts', amp.Integer()),
('msg', amp.String()),
('data', amp.String())]
errors = [(Exception, 'EXCEPTION')]
response = []
class BatchServer2Portal(amp.Command):
"""
Batch data server -> portal
This is used when the amount of outgoing data
is very high, to minimize the throughput.
"""
key = "BatchServer2Portal"
arguments = [('hashid', amp.Integer()),
('data', amp.String()),
('ipart', amp.Integer()),
@ -178,6 +165,7 @@ class BatchServer2Portal(amp.Command):
errors = [(Exception, 'EXCEPTION')]
response = []
class ServerAdmin(amp.Command):
"""
Portal -> Server
@ -187,11 +175,10 @@ class ServerAdmin(amp.Command):
session connects or resyncs
"""
key = "ServerAdmin"
arguments = [('sessid', amp.Integer()),
arguments = [('hashid', amp.Integer()),
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer()),
('operation', amp.String()),
('data', amp.String())]
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')]
response = []
@ -204,11 +191,10 @@ class PortalAdmin(amp.Command):
operations on the portal.
"""
key = "PortalAdmin"
arguments = [('sessid', amp.Integer()),
arguments = [('hashid', amp.Integer()),
('data', amp.String()),
('ipart', amp.Integer()),
('nparts', amp.Integer()),
('operation', amp.String()),
('data', amp.String())]
('nparts', amp.Integer())]
errors = [(Exception, 'EXCEPTION')]
response = []
@ -218,7 +204,8 @@ class FunctionCall(amp.Command):
Bidirectional
Sent when either process needs to call an
arbitrary function in the other.
arbitrary function in the other. This does
not use the batch-send functionality.
"""
key = "FunctionCall"
arguments = [('module', amp.String()),
@ -234,9 +221,6 @@ class FunctionCall(amp.Command):
dumps = lambda data: to_str(pickle.dumps(to_str(data), pickle.HIGHEST_PROTOCOL))
loads = lambda data: pickle.loads(to_str(data))
# multipart message store
#------------------------------------------------------------
# Core AMP protocol for communication Server <-> Portal
@ -252,15 +236,6 @@ class AMPProtocol(amp.AMP):
subclasses that specify the datatypes of the input/output of these methods.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the protocol
"""
super(AMPProtocol, self).__init__(*args, **kwargs)
self.outbatch = []
self.inbatch = []
self.lastsend = time()
# helper methods
def connectionMade(self):
@ -280,12 +255,17 @@ class AMPProtocol(amp.AMP):
self.factory.portal.sessions.at_server_connection()
if hasattr(self.factory, "server_restart_mode"):
del self.factory.server_restart_mode
# should be set both on portal and server
self.min_batch_step = 1.0 / BATCH_RATE
self.lastsend = time()
self.task = task.LoopingCall(self.batch_send, MsgPortal2Server, None)
self.task.start(BATCH_TIMEOUT)
# Error handling
def errback(self, e, info):
"error handler, to avoid dropping connections on server tracebacks."
f = e.trap(Exception)
e.trap(Exception)
print "AMP Error for %(info)s: %(e)s" % {'info': info,
'e': e.getErrorMessage()}
@ -293,22 +273,26 @@ class AMPProtocol(amp.AMP):
"""
This will batch data together to send fewer, large batches.
"""
global _SENDBATCH
if sessid is not None:
self.outbatch.append((sessid, kwargs))
_SENDBATCH.append((sessid, kwargs))
if not _SENDBATCH:
return
now = time()
timeout = 0.0025
if time() - self.lastsend > timeout:
batch = dumps(self.outbatch)
self.outbatch = []
to_send = [batch[i:i+MAXLEN] for i in range(0, len(batch), MAXLEN)]
if now - self.lastsend > self.min_batch_step:
batch = dumps(_SENDBATCH)
_SENDBATCH = []
to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)]
nparts = len(to_send)
hashid=id(batch)
# tag this batch
hashid = "%s-%s" % (id(batch), now)
if nparts == 1:
deferreds = self.callRemote(command,
hashid=hashid,
data=batch,
ipart=0,
nparts=1).addErrback(self.errback, "BatchServer2Portal")
nparts=1).addErrback(self.errback, command.key)
else:
deferreds = []
for ipart, part in enumerate(to_send):
@ -317,111 +301,37 @@ class AMPProtocol(amp.AMP):
data=part,
ipart=ipart,
nparts=nparts)
deferred.addErrback(self.errback, "BatchServer2Portal-part")
deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, part))
deferreds.append(deferred)
self.lastsend = time()
self.lastsend = time() # don't use now here, keep it as up-to-date as possible
return deferreds
else:
# make sure to tick
reactor.callLater(0.01, self.batch_send, command, None)
def batch_recv(self, hashid, data, ipart, nparts):
"""
This will receive and unpack data sent as a batch.
This will receive and unpack data sent as a batch. This both
handles too-long data as well as batch-sending very fast-
arriving commands.
"""
global _MSGBUFFER
if nparts == 1:
# most common case
return loads(data)
else:
if ipart < nparts-1:
# not yet complete
_MSGBUFFER[hashid].append(data)
return []
else:
# all parts in place
data = _MSGBUFFER.pop(hashid) + data
return loads(data)
# all parts in place - deserialize it
return loads(_MSGBUFFER.pop(hashid) + data)
def safe_send(self, command, sessid, **kwargs):
"""
This helper method splits the sending of a message into
multiple parts with a maxlength of MAXLEN. This is to avoid
repetition in two sending commands. when calling this the
maximum length has already been exceeded. The max-length will
be checked for all kwargs and these will be used as argument
to the command. The command type must have keywords ipart and
nparts to track the parts and put them back together on the
other side.
Returns a deferred or a list of such
"""
to_send = [(key, [string[i:i+MAXLEN] for i in range(0, len(string), MAXLEN)])
for key, string in kwargs.items()]
nparts_max = max(len(part[1]) for part in to_send)
if nparts_max == 1:
# first try to send directly
return self.callRemote(command,
sessid=sessid,
ipart=0,
nparts=1,
**kwargs).addErrback(self.errback, command.key)
else:
# one or more parts were too long for MAXLEN.
#print "TooLong triggered!"
deferreds = []
for ipart in range(nparts_max):
part_kwargs = {}
for key, str_part in to_send:
try:
part_kwargs[key] = str_part[ipart]
except IndexError:
# means this kwarg needed fewer splits
part_kwargs[key] = ""
# send this part
#print "amp safe sending:", ipart, nparts_max, str_part
deferreds.append(self.callRemote(
command,
sessid=sessid,
ipart=ipart,
nparts=nparts_max,
**part_kwargs).addErrback(self.errback, command.key))
return deferreds
def safe_recv(self, command, sessid, ipart, nparts, **kwargs):
"""
Safely decode potentially split data coming over the wire. No
decoding or parsing is done here, only merging of data split
with safe_send().
If the data stream is not yet complete, this method will return
None, otherwise it will return a dictionary of the (possibly
merged) properties.
"""
global _MSGBUFFER
if nparts == 1:
# the most common case
return kwargs
else:
# part of a multi-part send
hashid = "%s_%s" % (command.key, sessid)
#print "amp safe receive:", ipart, nparts-1, kwargs
if ipart < nparts-1:
# not yet complete
_MSGBUFFER[hashid].append(kwargs)
return
else:
# all parts in place, put them back together
buf = _MSGBUFFER.pop(hashid) + [kwargs]
recv_kwargs = dict((key, "".join(kw[key] for kw in buf)) for key in kwargs)
return recv_kwargs
# Message definition + helper methods to call/create each message type
# Portal -> Server Msg
def amp_msg_portal2server(self, sessid, ipart, nparts, msg, data):
def amp_msg_portal2server(self, hashid, data, ipart, nparts):
"""
Relays message to server. This method is executed on the Server.
@ -429,13 +339,12 @@ class AMPProtocol(amp.AMP):
data comes in multiple chunks; if so (nparts>1) we buffer the data
and wait for the remaining parts to arrive before continuing.
"""
ret = self.safe_recv(MsgPortal2Server, sessid, ipart, nparts,
text=msg, data=data)
if ret is not None:
batch = self.batch_recv(MsgPortal2Server, hashid, data, ipart, nparts)
for (sessid, kwargs) in batch:
#print "msg portal -> server (server side):", sessid, msg, loads(ret["data"])
self.factory.server.sessions.data_in(sessid,
text=ret["text"],
**loads(ret["data"]))
text=kwargs["msg"],
data=kwargs["data"])
return {}
MsgPortal2Server.responder(amp_msg_portal2server)
@ -444,23 +353,22 @@ class AMPProtocol(amp.AMP):
Access method called by the Portal and executed on the Portal.
"""
#print "msg portal->server (portal side):", sessid, msg, data
return self.safe_send(MsgPortal2Server, sessid,
msg=msg if msg is not None else "",
data=dumps(data))
return self.batch_send(MsgPortal2Server, sessid,
msg=msg if msg is not None else "",
data=data)
# Server -> Portal message
def amp_msg_server2portal(self, sessid, ipart, nparts, msg, data):
def amp_msg_server2portal(self, hashid, data, ipart, nparts):
"""
Relays message to Portal. This method is executed on the Portal.
"""
ret = self.safe_recv(MsgServer2Portal, sessid,
ipart, nparts, text=msg, data=data)
if ret is not None:
batch = self.batch_recv(hashid, data, ipart, nparts)
for (sessid, kwargs) in batch:
#print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"])
self.factory.portal.sessions.data_out(sessid,
text=ret["text"],
**loads(ret["data"]))
text=kwargs["msg"],
data=kwargs["data"])
return {}
MsgServer2Portal.responder(amp_msg_server2portal)
@ -475,31 +383,27 @@ class AMPProtocol(amp.AMP):
text=kwargs["msg"],
**kwargs["data"])
return {}
BatchServer2Portal.responder(amp_batch_server2portal)
MsgServer2Portal.responder(amp_batch_server2portal)
def call_remote_MsgServer2Portal(self, sessid, msg, data=""):
"""
Access method called by the Server and executed on the Server.
"""
#print "msg server->portal (server side):", sessid, msg, data
return self.batch_send(BatchServer2Portal, sessid, msg=msg, data=data)
#return self.safe_send(MsgServer2Portal, sessid,
# msg=msg if msg is not None else "",
# data=dumps(data))
return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data)
# Server administration from the Portal side
def amp_server_admin(self, sessid, ipart, nparts, operation, data):
def amp_server_admin(self, hashid, data, ipart, nparts):
"""
This allows the portal to perform admin
operations on the server. This is executed on the Server.
"""
ret = self.safe_recv(ServerAdmin, sessid, ipart, nparts,
operation=operation, data=data)
batch = self.batch_recv(hashid, data, ipart, nparts)
if ret is not None:
data = loads(ret["data"])
operation = ret["operation"]
for (sessid, kwargs) in batch:
operation = kwargs["operation"]
data = kwargs["data"]
server_sessionhandler = self.factory.server.sessions
#print "serveradmin (server side):", sessid, ord(operation), data
@ -532,21 +436,20 @@ class AMPProtocol(amp.AMP):
Access method called by the Portal and Executed on the Portal.
"""
#print "serveradmin (portal side):", sessid, ord(operation), data
data = dumps(data)
return self.safe_send(ServerAdmin, sessid, operation=operation, data=data)
return self.batch_send(ServerAdmin, sessid, operation=operation, data=data)
# Portal administraton from the Server side
def amp_portal_admin(self, sessid, ipart, nparts, operation, data):
def amp_portal_admin(self, hashid, data, ipart, nparts):
"""
This allows the server to perform admin
operations on the portal. This is executed on the Portal.
"""
#print "portaladmin (portal side):", sessid, ord(operation), data
ret = self.safe_recv(PortalAdmin, sessid, ipart, nparts,
operation=operation, data=data)
if ret is not None:
data = loads(data)
batch = self.batch_recv(hashid, data, ipart, nparts)
for (sessid, kwargs) in batch:
operation = kwargs["operation"]
data = kwargs["data"]
portal_sessionhandler = self.factory.portal.sessions
if operation == SLOGIN: # server_session_login
@ -584,7 +487,7 @@ class AMPProtocol(amp.AMP):
"""
Access method called by the server side.
"""
return self.safe_send(PortalAdmin, sessid, operation=operation, data=dumps(data))
return self.batch_send(PortalAdmin, sessid, operation=operation, data=dumps(data))
# Extra functions