task handler unit test revamp & bugfix

revamped task handler unit tests

found bug when a False persistent kwarg is passed to the add method. Resolved it.

All evennia unit tests pass. Default run level and run level 2.
This commit is contained in:
davewiththenicehat 2021-04-26 08:59:35 -04:00
parent 84193dd9a7
commit 99568148c6
2 changed files with 197 additions and 146 deletions

View file

@ -67,7 +67,7 @@ class TaskHandlerTask:
d.pause()
def unpause(self):
"""Process all callbacks made since pause() was called."""
"""Unpause a task, run the task if it has passed delay time."""
d = self.deferred
if d:
d.unpause()
@ -328,8 +328,9 @@ class TaskHandler(object):
# record the task to the tasks dictionary
persistent = kwargs.get("persistent", False)
if persistent:
if "persistent" in kwargs:
del kwargs["persistent"]
if persistent:
safe_args = []
safe_kwargs = {}
@ -358,10 +359,10 @@ class TaskHandler(object):
else:
safe_kwargs[key] = value
self.tasks[task_id] = (comp_time, callback, safe_args, safe_kwargs, True, None)
self.tasks[task_id] = (comp_time, callback, safe_args, safe_kwargs, persistent, None)
self.save()
else: # this is a non-persitent task
self.tasks[task_id] = (comp_time, callback, args, kwargs, True, None)
self.tasks[task_id] = (comp_time, callback, args, kwargs, persistent, None)
# defer the task
callback = self.do_task

View file

@ -323,163 +323,213 @@ class TestDelay(EvenniaTest):
Test utils.delay.
"""
def test_delay(self):
def setUp(self):
super().setUp()
# get a reference of TASK_HANDLER
timedelay = 5
self.timedelay = 5
global _TASK_HANDLER
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
_TASK_HANDLER.clock = task.Clock()
self.char1.ndb.dummy_var = False
# test a persistent deferral, that completes after delay time
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
# call the task early to test Task.call and TaskHandler.call_task
result = t.call()
self.assertTrue(result)
del result
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.assertTrue(_TASK_HANDLER.active(t.get_id())) # test Task.get_id
self.assertTrue(t.active())
self.char1.ndb.dummy_var = False # Set variable to continue completion after delay time test.
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertTrue(t.called) # test Task.called property
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
# test a persistent deferral, that completes on a manual call
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
self.assertTrue(t.active())
result = t.do_task()
self.assertTrue(result)
self.assertFalse(t.exists())
_TASK_HANDLER.clock.advance(timedelay) # make time pass, important keep
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
# test a non persisten deferral, that completes after delay time.
def tearDown(self):
super().tearDown()
_TASK_HANDLER.clear()
def test_call_early(self):
# call a task early with call
for pers in (True, False):
t = utils.delay(self.timedelay, dummy_func, self.char1.dbref, persistent=pers)
result = t.call()
self.assertTrue(result)
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.assertTrue(t.exists())
self.assertTrue(t.active())
self.char1.ndb.dummy_var = False
def test_do_task(self):
# call the task early with do_task
for pers in (True, False):
t = utils.delay(self.timedelay, dummy_func, self.char1.dbref, persistent=pers)
# call the task early to test Task.call and TaskHandler.call_task
result = t.do_task()
self.assertTrue(result)
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.assertFalse(t.exists())
self.char1.ndb.dummy_var = False
def test_deferred_call(self):
# wait for deferred to call
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
self.assertTrue(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.assertFalse(t.exists())
self.char1.ndb.dummy_var = False
def test_short_deferred_call(self):
# wait for deferred to call with a very short time
timedelay = .1
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
self.assertTrue(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.assertFalse(t.exists())
self.char1.ndb.dummy_var = False
def test_active(self):
timedelay = self.timedelay
t = utils.delay(timedelay, dummy_func, self.char1.dbref)
self.assertTrue(_TASK_HANDLER.active(t.get_id()))
self.assertTrue(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
# test a non-persistent deferral, that completes on a manual call
t = utils.delay(timedelay, dummy_func, self.char1.dbref)
self.assertTrue(t.active())
result = t.do_task()
self.assertTrue(result)
self.assertFalse(t.exists())
_TASK_HANDLER.clock.advance(timedelay) # make time pass, important keep
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
# test a non persisten deferral, with a short timedelay
t = utils.delay(.1, dummy_func, self.char1.dbref)
self.assertTrue(t.active())
_TASK_HANDLER.clock.advance(.1) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
# test canceling a deferral.
# after this the task_id 1 remains used by this canceled but unused task
t = utils.delay(timedelay, dummy_func, self.char1.dbref)
self.assertTrue(t.active())
success = t.cancel()
self.assertFalse(_TASK_HANDLER.active(t.get_id()))
self.assertFalse(t.active())
self.assertTrue(success)
self.assertTrue(t.exists())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
self.char1.ndb.dummy_var = False
# test removing an active task
def test_called(self):
timedelay = self.timedelay
t = utils.delay(timedelay, dummy_func, self.char1.dbref)
self.assertTrue(t.active())
success = t.remove()
self.assertFalse(t.active())
self.assertFalse(t.called)
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
self.assertFalse(t.exists())
self.char1.ndb.dummy_var = False
# test removing a canceled task
t = utils.delay(timedelay, dummy_func, self.char1.dbref)
self.assertTrue(t.active())
deferal_inst = t.get_deferred()
deferal_inst.cancel()
self.assertFalse(t.active())
success = t.remove()
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
self.assertFalse(t.exists())
self.char1.ndb.dummy_var = False
# test pause, paused and unpause
t = utils.delay(timedelay, dummy_func, self.char1.dbref)
self.assertTrue(t.active())
t.pause()
self.assertTrue(t.paused)
t.unpause()
self.assertFalse(t.paused)
self.assertEqual(self.char1.ndb.dummy_var, False)
t.pause()
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
t.unpause()
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
# test automated removal of stale tasks.
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
t.cancel()
self.assertFalse(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
self.assertTrue(t.called)
def test_cancel(self):
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
self.assertTrue(t.active())
success = t.cancel()
self.assertFalse(t.active())
self.assertTrue(success)
self.assertTrue(t.exists())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
def test_remove(self):
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
self.assertTrue(t.active())
success = t.remove()
self.assertTrue(success)
self.assertFalse(t.active())
self.assertFalse(t.exists())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
def test_remove_canceled(self):
# remove a canceled task
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
self.assertTrue(t.active())
success = t.cancel()
self.assertTrue(success)
self.assertTrue(t.exists())
self.assertFalse(t.active())
success = t.remove()
self.assertTrue(success)
self.assertFalse(t.exists())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
def test_pause_unpause(self):
# remove a canceled task
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
self.assertTrue(t.active())
t.pause()
self.assertTrue(t.paused)
t.unpause()
self.assertFalse(t.paused)
self.assertEqual(self.char1.ndb.dummy_var, False)
t.pause()
_TASK_HANDLER.clock.advance(timedelay) # make time pass
self.assertEqual(self.char1.ndb.dummy_var, False)
t.unpause()
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
self.char1.ndb.dummy_var = False
def test_auto_stale_task_removal(self):
# automated removal of stale tasks.
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
t.cancel()
self.assertFalse(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
if pers:
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
# Make task handler's now time, after the stale timeout
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=_TASK_HANDLER.stale_timeout + timedelay + 1)
# add a task to test automatic removal
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=_TASK_HANDLER.stale_timeout + 6) # task handler time to 6 seconds after stale timeout
t2 = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
self.assertFalse(t.get_id() in _TASK_HANDLER.to_save)
self.assertFalse(t.get_id() in _TASK_HANDLER.tasks)
self.assertEqual(self.char1.ndb.dummy_var, False)
# test manual cleanup
t2.cancel()
_TASK_HANDLER.clock.advance(timedelay) # advance twisted reactor time past callback time
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=30) # set TaskHandler's time to 30 seconnds from now
# test before stale_timeout time
_TASK_HANDLER.clean_stale_tasks() # cleanup of stale tasks in in the save method
# still in the task handler because stale timeout has not been reached
self.assertTrue(t2.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t2.get_id() in _TASK_HANDLER.tasks)
# advance past stale timeout
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=_TASK_HANDLER.stale_timeout + 6) # task handler time to 6 seconds after stale timeout
_TASK_HANDLER.clean_stale_tasks() # cleanup of stale tasks in in the save method
self.assertFalse(t2.get_id() in _TASK_HANDLER.to_save)
self.assertFalse(t2.get_id() in _TASK_HANDLER.tasks)
self.char1.ndb.dummy_var = False
_TASK_HANDLER._now = False
# if _TASK_HANDLER.stale_timeout is 0 or less, automatic cleanup should not run
t2 = utils.delay(timedelay, dummy_func, self.char1.dbref)
if pers:
self.assertFalse(t.get_id() in _TASK_HANDLER.to_save)
self.assertFalse(t.get_id() in _TASK_HANDLER.tasks)
self.assertEqual(self.char1.ndb.dummy_var, False)
_TASK_HANDLER.clear()
def test_manual_stale_task_removal(self):
# manual removal of stale tasks.
timedelay = self.timedelay
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
t.cancel()
self.assertFalse(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
if pers:
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
# Make task handler's now time, after the stale timeout
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=_TASK_HANDLER.stale_timeout + timedelay + 1)
_TASK_HANDLER.clean_stale_tasks() # cleanup of stale tasks in in the save method
if pers:
self.assertFalse(t.get_id() in _TASK_HANDLER.to_save)
self.assertFalse(t.get_id() in _TASK_HANDLER.tasks)
self.assertEqual(self.char1.ndb.dummy_var, False)
_TASK_HANDLER.clear()
def test_disable_stale_removal(self):
# manual removal of stale tasks.
timedelay = self.timedelay
_TASK_HANDLER.stale_timeout = 0
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
t.cancel()
self.assertFalse(t.active())
_TASK_HANDLER.clock.advance(timedelay) # advance twisted's reactor past callback time
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
# add a task to test automatic removal
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=_TASK_HANDLER.stale_timeout + 6) # task handler time to 6 seconds after stale timeout
t2 = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
self.assertEqual(self.char1.ndb.dummy_var, False)
_TASK_HANDLER.clear()
self.char1.ndb.dummy_var = False
_TASK_HANDLER._now = False
# replicate a restart
_TASK_HANDLER.clear()
_TASK_HANDLER.save()
self.assertFalse(_TASK_HANDLER.tasks)
self.assertFalse(_TASK_HANDLER.to_save)
# create a persistent task.
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
_TASK_HANDLER.save()
_TASK_HANDLER.clear(False) # remove all tasks, do not save this change.
for pers in (False, True):
t = utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=pers)
t.cancel()
self.assertFalse(t.active())
_TASK_HANDLER.clock.advance(timedelay) # make time pass
if pers:
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
# Make task handler's now time, after the stale timeout
_TASK_HANDLER._now = datetime.now() + timedelta(seconds=_TASK_HANDLER.stale_timeout + timedelay + 1)
t2 = utils.delay(timedelay, dummy_func, self.char1.dbref)
if pers:
self.assertTrue(t.get_id() in _TASK_HANDLER.to_save)
self.assertTrue(t.get_id() in _TASK_HANDLER.tasks)
self.assertEqual(self.char1.ndb.dummy_var, False)
# manual removal should still work
_TASK_HANDLER.clean_stale_tasks() # cleanup of stale tasks in in the save method
if pers:
self.assertFalse(t.get_id() in _TASK_HANDLER.to_save)
self.assertFalse(t.get_id() in _TASK_HANDLER.tasks)
_TASK_HANDLER.clear()
def test_server_restart(self):
# emulate a server restart
timedelay = self.timedelay
utils.delay(timedelay, dummy_func, self.char1.dbref, persistent=True)
_TASK_HANDLER.clear(False) # remove all tasks from task handler, do not save this change.
_TASK_HANDLER.clock.advance(timedelay) # advance twisted reactor time past callback time
self.assertEqual(self.char1.ndb.dummy_var, False) # task has not run
_TASK_HANDLER.load()
_TASK_HANDLER.create_delays()
_TASK_HANDLER.load() # load persistent tasks from database.
_TASK_HANDLER.create_delays() # create new deffered instances from persistent tasks
_TASK_HANDLER.clock.advance(timedelay) # Clock must advance to trigger, even if past timedelay
self.assertEqual(self.char1.ndb.dummy_var, 'dummy_func ran')
_TASK_HANDLER.clear()
self.char1.ndb.dummy_var = False