CmdTasks Created

Created a system command for displaying and manipulating tasks.

A single unit test in evennia.commands.default.tests.TestHelp fails. Message pattern mismatch.
All other unit tests pass.
This commit is contained in:
davewiththenicehat 2021-05-28 15:46:37 -04:00
parent 407a5642c4
commit fc6eb75a3c
3 changed files with 287 additions and 1 deletions

View file

@ -50,6 +50,7 @@ class CharacterCmdSet(CmdSet):
self.add(system.CmdServerLoad())
# self.add(system.CmdPs())
self.add(system.CmdTickers())
self.add(system.CmdTasks())
# Admin commands
self.add(admin.CmdBoot())

View file

@ -8,7 +8,6 @@ System commands
import code
import traceback
import os
import io
import datetime
import sys
import django
@ -26,8 +25,10 @@ from evennia.utils.eveditor import EvEditor
from evennia.utils.evtable import EvTable
from evennia.utils.evmore import EvMore
from evennia.utils.utils import crop, class_from_module
from evennia.scripts.taskhandler import TaskHandlerTask
COMMAND_DEFAULT_CLASS = class_from_module(settings.COMMAND_DEFAULT_CLASS)
_TASK_HANDLER = None
# delayed imports
_RESOURCE = None
@ -45,6 +46,7 @@ __all__ = (
"CmdAbout",
"CmdTime",
"CmdServerLoad",
"CmdTasks",
)
@ -1203,3 +1205,183 @@ class CmdTickers(COMMAND_DEFAULT_CLASS):
"*" if sub[5] else "-",
)
self.caller.msg("|wActive tickers|n:\n" + str(table))
class CmdTasks(COMMAND_DEFAULT_CLASS):
"""
Manage active tasks (delays).
Note:
Only the action requested on the first switch will be processed. All other switches will
be ignored.
Manipulation of a single task is intended to be done via the clickable links or through
code directly. Due to generally short life of a task, the inclusion of completion date
and function's memory reference guarentees an incorrect task will not be manipulated.
By default, tasks that are canceled and never called are automatically removed after
one minute.
Usage:
tasks
show all current active taks.
tasks[/switch] [function name]
Process the action in the switch to all tasks that are deferring a specific function name.
This would match the name of the callback you passed to a delay or the task handler.
tasks[/switch] [task id], [completion date], [function memory reference]
Process the action in the switch to a specific task.
Example:
tasks/cancel move_callback
Cancel all movement delays from the slow_exit contrib.
In this example slow exits creates it's tasks with: utils.delay(move_delay, move_callback)
Switches:
pause - Pause the callback of a task.
unpause - Process all callbacks made since pause() was called.
do_task - Execute the task (call its callback).
call - Call the callback of this task.
remove - Remove a task without executing it.
cancel - Stop a task from automatically executing.
"""
key = "tasks"
aliases = ["delays"]
switch_options = ("pause", "unpause", "do_task", "call", "remove", "cancel")
locks = "perm(Developer)"
help_category = "System"
@staticmethod
def coll_date_func(task):
"""Replace regex characters in date string and collect deferred function name."""
t_comp_date = str(task[0]).replace('-', '/').replace('.', ' ms:')
t_func_name = str(task[1]).split(' ')
t_func_mem_ref = t_func_name[3] if len(t_func_name) >= 4 else None
return t_comp_date, t_func_mem_ref
def func(self):
caller = self.caller
# get a reference of the global task handler
global _TASK_HANDLER
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
# handle no tasks active.
if not _TASK_HANDLER.tasks:
self.msg('There are no active tasks.')
if self.switches or self.args:
self.msg('Likely the task has completed and been removed.')
return
# handle caller's request to manipulate a task(s)
if self.switches or self.lhs:
action_request = self.switches[0]
# handle caller requesting an action on specific deferred function
if len(self.lhslist) == 1:
name_match_found = False
arg_func_name = self.lhslist[0].lower()
# repack tasks into a new dictionary
current_tasks = {}
for task_id, task_args in _TASK_HANDLER.tasks.items():
current_tasks.update({task_id: task_args})
# call requested action on all tasks with the function name
for task_id, task_args in current_tasks.items():
t_func_name = str(task_args[1]).split(' ')
t_func_name = t_func_name[1] if len(t_func_name) >= 2 else None
# skip this task if it is not for the function desired
if arg_func_name != t_func_name:
continue
name_match_found = True
task = TaskHandlerTask(task_id)
switch_action = getattr(task, action_request, False)
if switch_action:
action_return = switch_action()
self.msg(f'Task action {action_request} completed on task ID {task_id}.')
self.msg(f'The task function {action_request} returned: {action_return}')
# provide a message if not tasks of the function name was found
if not name_match_found:
self.msg(f'No tasks deferring function name {arg_func_name} found.')
return
return True
err_arg_msg = 'Task ID, completion date and memory reference are required when manipulating a delay.'
task_comp_msg = 'Task completed while processing request.'
# handle missing arguments or switches
if not self.switches and self.lhs:
self.msg(err_arg_msg)
return
# handle incorrect arguments
if len(self.lhslist) < 2:
self.msg(err_arg_msg)
return
# create a handle for the task
task_id = int(self.lhslist[0])
task = TaskHandlerTask(task_id)
# handle task no longer existing
if not task.exists():
self.msg(f'Task {task_id} does not exist.')
return
# get a reference of the function caller requested
switch_action = getattr(task, action_request, False)
if not switch_action:
self.msg(f'{self.switches[0]}, is not an acceptable task action or ' \
f'{task_comp_msg.lower()}')
# verify manipulating the correct task
if task_id in _TASK_HANDLER.tasks:
task_args = _TASK_HANDLER.tasks.get(task_id, False)
if task_args: # check if the task is still active
sw_comp_date = self.lhslist[1]
sw_func_mem_ref = self.lhslist[2]
t_comp_date, t_func_mem_ref = self.coll_date_func(task_args)
# handle completion date mismatch
if not t_comp_date == sw_comp_date:
self.msg('Task completion time does not match time passed.')
self.msg(task_comp_msg)
self.msg('Likely a new task with the same ID was created')
return
# handle function memory reference mismatch
if not t_func_mem_ref == sw_func_mem_ref:
self.msg("Memory reference for the task's function does not match argument")
self.msg(task_comp_msg)
self.msg('Likely a new task with the same ID was created')
return
else: # task no longer exists
self.msg(task_comp_msg)
return
if task.exists(): # make certain the task has not been called yet.
# call the task's method
action_return = switch_action()
self.msg(f'Task action {action_request} completed.')
self.msg(f'The task function {action_request} returned: {action_return}')
return True
else:
self.msg(task_comp_msg)
return
# No task manupilation requested, build a table of tasks and display it
# get the width of screen in characters
def_screen_width = settings.CLIENT_DEFAULT_WIDTH if settings.CLIENT_DEFAULT_WIDTH else 80
caller_session = caller.sessions.get()
width = caller_session[0].protocol_flags.get("SCREENWIDTH", def_screen_width)
# create table header and list to hold tasks data and actions
tasks_header = ('Task ID', 'Completion Date', 'Function', 'Arguments', 'KWARGS', 'persistent')
tasks_list = [list() for i in range(len(tasks_header))] # empty list of lists, the size of the header
for task_id, task in _TASK_HANDLER.tasks.items():
# collect data from the task
t_comp_date, t_func_mem_ref = self.coll_date_func(task)
t_func_name = str(task[1]).split(' ')
t_func_name = t_func_name[1] if len(t_func_name) >= 2 else None
t_args = str(task[2])
t_kwargs = str(task[3])
t_pers = str(task[4])
# add task data to the tasks list
task_data = (task_id, t_comp_date, t_func_name, t_args, t_kwargs, t_pers)
for i in range(len(tasks_header)):
tasks_list[i].append(task_data[i])
# add task actions to the tasks list
actions = ('pause', 'unpause', 'do_task', 'remove', 'call', 'cancel')
for i in range(len(tasks_header)):
tasks_list[i].append(f"|lc{self.key}/{actions[i]} {task_id}, {t_comp_date}, " \
f"{t_func_mem_ref}|lt{actions[i]}|le")
# if the screen width is large enough, add directional arrows
if width >= 75:
tasks_list[i][-1] = f"^{tasks_list[i][-1]}^"
# create and display the table
tasks_table = EvTable(*tasks_header, table=tasks_list, maxwidth=width, border='cells', align='center')
self.msg(tasks_table)

View file

@ -18,6 +18,7 @@ from anything import Anything
from parameterized import parameterized
from django.conf import settings
from twisted.internet import task
from unittest.mock import patch, Mock, MagicMock
from evennia import DefaultRoom, DefaultExit, ObjectDB
@ -574,6 +575,108 @@ class TestSystem(CommandTest):
def test_server_load(self):
self.call(system.CmdServerLoad(), "", "Server CPU and Memory load:")
_TASK_HANDLER = None
def func_test_cmd_tasks():
return 'success'
class TestCmdTasks(CommandTest):
def setUp(self):
super().setUp()
# get a reference of TASK_HANDLER
self.timedelay = 5
global _TASK_HANDLER
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
_TASK_HANDLER.clock = task.Clock()
self.task_handler = _TASK_HANDLER
self.task_handler.clear()
self.task = self.task_handler.add(self.timedelay, func_test_cmd_tasks)
task_args = self.task_handler.tasks.get(self.task.get_id(), False)
self.t_comp_date, self.t_func_mem_ref = system.CmdTasks.coll_date_func(task_args)
def tearDown(self):
super().tearDown()
self.task_handler.clear()
def test_no_tasks(self):
self.task_handler.clear()
self.call(system.CmdTasks(), '', 'There are no active tasks.')
def test_active_task(self):
cmd_result = self.call(system.CmdTasks(), '')
for ptrn in ('Task ID', 'Completion Date', 'Function', 'KWARGS', 'persisten',
'1', r'\d+/\d+/\d+', r'\d+\:\d+\:\d+', r'ms\:\d+', 'func_test', '{}',
'False'):
self.assertRegex(cmd_result, ptrn)
def test_persistent_task(self):
self.task_handler.clear()
self.task_handler.add(self.timedelay, func_test_cmd_tasks, persistent=True)
cmd_result = self.call(system.CmdTasks(), '')
self.assertRegex(cmd_result, 'True')
def test_pause_unpause(self):
# test pause
args = f'/pause {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}'
wanted_msg = 'Task action pause completed.|The task function pause returned:'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertTrue(self.task.paused)
self.task_handler.clock.advance(self.timedelay + 1)
# test unpause
args = f'/unpause {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}'
self.assertTrue(self.task.exists())
wanted_msg = 'Task action unpause completed.|The task function unpause returned: None'
self.call(system.CmdTasks(), args, wanted_msg)
# verify task continues after unpause
self.task_handler.clock.advance(1)
self.assertFalse(self.task.exists())
def test_do_task(self):
args = f'/do_task {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}'
wanted_msg = 'Task action do_task completed.|The task function do_task returned: success'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertFalse(self.task.exists())
def test_remove(self):
args = f'/remove {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}'
wanted_msg = 'Task action remove completed.|The task function remove returned: True'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertFalse(self.task.exists())
def test_call(self):
args = f'/call {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}'
wanted_msg = 'Task action call completed.|The task function call returned: success'
self.call(system.CmdTasks(), args, wanted_msg)
# make certain the task is still active
self.assertTrue(self.task.active())
# go past delay time, the task should call do_task and remove itself after calling.
self.task_handler.clock.advance(self.timedelay + 1)
self.assertFalse(self.task.exists())
def test_cancel(self):
args = f'/cancel {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}'
wanted_msg = 'Task action cancel completed.|The task function cancel returned: True'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertTrue(self.task.exists())
self.assertFalse(self.task.active())
def test_func_name_manipulation(self):
self.task_handler.add(self.timedelay, func_test_cmd_tasks) # add an extra task
args = f'/remove func_test_cmd_tasks'
wanted_msg = 'Task action remove completed on task ID 1.|The task function remove returned: True|' \
'Task action remove completed on task ID 2.|The task function remove returned: True'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertFalse(self.task_handler.tasks) # no tasks should exist.
def test_wrong_func_name(self):
args = f'/remove intentional_fail'
wanted_msg = 'No tasks deferring function name intentional_fail found.'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertTrue(self.task.active())
class TestAdmin(CommandTest):
def test_emit(self):