Dev: Testing with asyncronous db saving.

This commit is contained in:
Griatch 2011-11-03 14:08:14 +01:00
parent 1995f61d46
commit 17951a05f5
2 changed files with 63 additions and 3 deletions

View file

@ -152,6 +152,7 @@ DATABASE_PASSWORD = ''
DATABASE_HOST = ''
DATABASE_PORT = ''
DATABASE_NONBLOCKING_SAVE = True
###################################################
# Evennia in-game parsers

View file

@ -1,10 +1,66 @@
"""
Idmapper base functionality. Most of this is unchanged from the idmapper distribution.
Extended for Evennia:
- made object cache a dictionary rather than a WeakValueDictionary. The latter does not work
well for long-time persistence in memory and caused very hard-to-track bugs in Evennia's
typeclass system (which depends on cached memory adresses not changing or going out of scope).
- Added optional asynchronous save operation for use with Twisted.
"""
from weakref import WeakValueDictionary, ref
from django.db.models.base import Model, ModelBase
from manager import SharedMemoryManager
TCACHE = {} # test cache, for debugging /Griatch
#
# Evennia extension: Asynchronous save functionality. This
# is used by SharedMemoryBase.save().
#
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import cooperate
from twisted.internet.threads import deferToThread, blockingCallFromThread
from django.conf import settings
from src.utils import logger
from twisted.internet import reactor
ASYNC_DB_SAVE = settings.DATABASE_NONBLOCKING_SAVE
# This special form of queue has a get() function that returns deferreds.
# Add save function tuples to this in order to add to save queue.
ASYNC_QUEUE = DeferredQueue(backlog=1)
def async_callback(funcdef):
"This callback is run with the item returned from queue - a tuple (func, args, kwargs)"
#d = deferToThread(funcdef[0], *funcdef[1], **funcdef[2])
d = reactor.callFromThread(funcdef[0], *funcdef[1], **funcdef[2])
#d = reactor.blockingCallFromThread(funcdef[0], *funcdef[1], **funcdef[2])
return d
def async_errback(failure):
"errback"
logger.log_errmsg(str(failure))
def async_worker(queue):
"""
This will eternally yield items from queue as soon as they are
available (since DeferredQueue.get() only returns when there is
actually something in the queue). The queue automatically inserts
the stored value into the callback async_exec.
"""
while True:
yield queue.get().addCallbacks(async_callback, async_errback)
# the cooperate handler will iterate over async_worker every time its
# returned deferred fires.
cooperate(async_worker(ASYNC_QUEUE))
# in models's save(*args, **kwargs):
# ASYNC_QUEUE.put((super(SharedMemoryModel, self).save, args, kwargs))
# -------------------------------------------------------
class SharedMemoryModelBase(ModelBase):
#def __new__(cls, name, bases, attrs):
@ -120,13 +176,16 @@ class SharedMemoryModel(Model):
"""
cls._flush_cached_by_key(instance._get_pk_val())
#key = "%s-%s" % (cls, instance.pk)
#del TCACHE[key]
#print "uncached: %s (%s: %s) (total cached: %s)" % (instance, cls.__name__, len(cls.__instance_cache__), len(TCACHE))
flush_cached_instance = classmethod(flush_cached_instance)
def save(self, *args, **kwargs):
super(SharedMemoryModel, self).save(*args, **kwargs)
if ASYNC_DB_SAVE:
logger.log_infomsg("Adding to queue %s" % self)
ASYNC_QUEUE.put((super(SharedMemoryModel, self).save, args, kwargs))
else:
super(SharedMemoryModel, self).save(*args, **kwargs)
self.__class__.cache_instance(self)
# TODO: This needs moved to the prepare stage (I believe?)