Merge branch 'CmdDelays' of https://github.com/davewiththenicehat/evennia into davewiththenicehat-CmdDelays

This commit is contained in:
Griatch 2021-06-19 12:42:11 +02:00
commit 11bfd96cdd
3 changed files with 397 additions and 1 deletions

View file

@ -50,6 +50,7 @@ class CharacterCmdSet(CmdSet):
self.add(system.CmdServerLoad())
# self.add(system.CmdPs())
self.add(system.CmdTickers())
self.add(system.CmdTasks())
# Admin commands
self.add(admin.CmdBoot())

View file

@ -8,7 +8,6 @@ System commands
import code
import traceback
import os
import io
import datetime
import sys
import django
@ -25,9 +24,12 @@ from evennia.utils import logger, utils, gametime, create, search
from evennia.utils.eveditor import EvEditor
from evennia.utils.evtable import EvTable
from evennia.utils.evmore import EvMore
from evennia.utils.evmenu import ask_yes_no
from evennia.utils.utils import crop, class_from_module
from evennia.scripts.taskhandler import TaskHandlerTask
COMMAND_DEFAULT_CLASS = class_from_module(settings.COMMAND_DEFAULT_CLASS)
_TASK_HANDLER = None
# delayed imports
_RESOURCE = None
@ -45,6 +47,7 @@ __all__ = (
"CmdAbout",
"CmdTime",
"CmdServerLoad",
"CmdTasks",
)
@ -1203,3 +1206,230 @@ class CmdTickers(COMMAND_DEFAULT_CLASS):
"*" if sub[5] else "-",
)
self.caller.msg("|wActive tickers|n:\n" + str(table))
class CmdTasks(COMMAND_DEFAULT_CLASS):
"""
Display or terminate active tasks (delays).
Usage:
tasks
show all current active taks.
tasks[/switch] [function name]
Process the action in the switch to all tasks that are deferring a specific function name.
This would match the name of the callback you passed to a delay or the task handler.
tasks[/switch] [task id]
Process the action in the switch to a specific task.
Switches:
pause - Pause the callback of a task.
unpause - Process all callbacks made since pause() was called.
do_task - Execute the task (call its callback).
call - Call the callback of this task.
remove - Remove a task without executing it.
cancel - Stop a task from automatically executing.
Notes:
A task is a single use method of delaying the call of a function.
Tasks can be created using utils.delay.
Reference: https://evennia.readthedocs.io/en/latest/Command-Duration.html
Only the action requested on the first switch will be processed. All other switches will
be ignored.
By default, tasks that are canceled and never called are automatically removed after
one minute.
Example:
tasks/cancel move_callback
Cancel all movement delays from the slow_exit contrib.
In this example slow exits creates it's tasks with:
utils.delay(move_delay, move_callback)
tasks/cancel 2
Cancel task id 2.
"""
key = "tasks"
aliases = ["delays"]
switch_options = ("pause", "unpause", "do_task", "call", "remove", "cancel")
locks = "perm(Developer)"
help_category = "System"
@staticmethod
def coll_date_func(task):
"""Replace regex characters in date string and collect deferred function name."""
t_comp_date = str(task[0]).replace('-', '/').replace('.', ' ms:')
t_func_name = str(task[1]).split(' ')
t_func_mem_ref = t_func_name[3] if len(t_func_name) >= 4 else None
return t_comp_date, t_func_mem_ref
def do_task_action(self, *args, **kwargs):
"""
Process the action of a tasks command.
This exists to gain support with yes or no function from EvMenu.
"""
task_id = self.task_id
# get a reference of the global task handler
global _TASK_HANDLER
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
# verify manipulating the correct task
task_args = _TASK_HANDLER.tasks.get(task_id, False)
if not task_args: # check if the task is still active
self.msg('Task completed while waiting for input.')
return
else:
# make certain a task with matching IDs has not been created
t_comp_date, t_func_mem_ref = self.coll_date_func(task_args)
if self.t_comp_date != t_comp_date or self.t_func_mem_ref != t_func_mem_ref:
self.msg('Task completed while waiting for input.')
return
# Do the action requested by command caller
action_return = self.task_action()
self.msg(f'{self.action_request} request completed.')
self.msg(f'The task function {self.action_request} returned: {action_return}')
def func(self):
# get a reference of the global task handler
global _TASK_HANDLER
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
# handle no tasks active.
if not _TASK_HANDLER.tasks:
self.msg('There are no active tasks.')
if self.switches or self.args:
self.msg('Likely the task has completed and been removed.')
return
# handle caller's request to manipulate a task(s)
if self.switches and self.lhs:
# find if the argument is a task id or function name
action_request = self.switches[0]
try:
arg_is_id = int(self.lhslist[0])
except ValueError:
arg_is_id = False
# if the argument is a task id, proccess the action on a single task
if arg_is_id:
err_arg_msg = 'Switch and task ID are required when manipulating a task.'
task_comp_msg = 'Task completed while processing request.'
# handle missing arguments or switches
if not self.switches and self.lhs:
self.msg(err_arg_msg)
return
# create a handle for the task
task_id = arg_is_id
task = TaskHandlerTask(task_id)
# handle task no longer existing
if not task.exists():
self.msg(f'Task {task_id} does not exist.')
return
# get a reference of the function caller requested
switch_action = getattr(task, action_request, False)
if not switch_action:
self.msg(f'{self.switches[0]}, is not an acceptable task action or ' \
f'{task_comp_msg.lower()}')
# verify manipulating the correct task
if task_id in _TASK_HANDLER.tasks:
task_args = _TASK_HANDLER.tasks.get(task_id, False)
if not task_args: # check if the task is still active
self.msg(task_comp_msg)
return
else:
t_comp_date, t_func_mem_ref = self.coll_date_func(task_args)
t_func_name = str(task_args[1]).split(' ')
t_func_name = t_func_name[1] if len(t_func_name) >= 2 else None
if task.exists(): # make certain the task has not been called yet.
prompt = f'Yes or No, {action_request} task {task_id}? With completion date ' \
f'{t_comp_date}. Deferring function {t_func_name}.'
no_msg = f'No {action_request} processed.'
# record variables for use in do_task_action method
self.task_id = task_id
self.t_comp_date = t_comp_date
self.t_func_mem_ref = t_func_mem_ref
self.task_action = switch_action
self.action_request = action_request
ask_yes_no(self.caller, prompt, self.do_task_action, no_msg, no_msg, True)
return True
else:
self.msg(task_comp_msg)
return
# the argument is not a task id, process the action on all task deferring the function
# specified as an argument
else:
name_match_found = False
arg_func_name = self.lhslist[0].lower()
# repack tasks into a new dictionary
current_tasks = {}
for task_id, task_args in _TASK_HANDLER.tasks.items():
current_tasks.update({task_id: task_args})
# call requested action on all tasks with the function name
for task_id, task_args in current_tasks.items():
t_func_name = str(task_args[1]).split(' ')
t_func_name = t_func_name[1] if len(t_func_name) >= 2 else None
# skip this task if it is not for the function desired
if arg_func_name != t_func_name:
continue
name_match_found = True
task = TaskHandlerTask(task_id)
switch_action = getattr(task, action_request, False)
if switch_action:
action_return = switch_action()
self.msg(f'Task action {action_request} completed on task ID {task_id}.')
self.msg(f'The task function {action_request} returned: {action_return}')
# provide a message if not tasks of the function name was found
if not name_match_found:
self.msg(f'No tasks deferring function name {arg_func_name} found.')
return
return True
# check if an maleformed request was created
elif self.switches or self.lhs:
self.msg('Task command misformed.')
self.msg('Proper format tasks[/switch] [function name or task id]')
return
# No task manupilation requested, build a table of tasks and display it
# get the width of screen in characters
width = self.client_width()
# create table header and list to hold tasks data and actions
tasks_header = ('Task ID', 'Completion Date', 'Function', 'Arguments', 'KWARGS',
'persistent')
# empty list of lists, the size of the header
tasks_list = [list() for i in range(len(tasks_header))]
for task_id, task in _TASK_HANDLER.tasks.items():
# collect data from the task
t_comp_date, t_func_mem_ref = self.coll_date_func(task)
t_func_name = str(task[1]).split(' ')
t_func_name = t_func_name[1] if len(t_func_name) >= 2 else None
t_args = str(task[2])
t_kwargs = str(task[3])
t_pers = str(task[4])
# add task data to the tasks list
task_data = (task_id, t_comp_date, t_func_name, t_args, t_kwargs, t_pers)
for i in range(len(tasks_header)):
tasks_list[i].append(task_data[i])
# create and display the table
tasks_table = EvTable(*tasks_header, table=tasks_list, maxwidth=width, border='cells',
align='center')
self.msg(tasks_table)

View file

@ -18,6 +18,7 @@ from anything import Anything
from parameterized import parameterized
from django.conf import settings
from twisted.internet import task
from unittest.mock import patch, Mock, MagicMock
from evennia import DefaultRoom, DefaultExit, ObjectDB
@ -575,6 +576,170 @@ class TestSystem(CommandTest):
def test_server_load(self):
self.call(system.CmdServerLoad(), "", "Server CPU and Memory load:")
_TASK_HANDLER = None
def func_test_cmd_tasks():
return 'success'
class TestCmdTasks(CommandTest):
def setUp(self):
super().setUp()
# get a reference of TASK_HANDLER
self.timedelay = 5
global _TASK_HANDLER
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
_TASK_HANDLER.clock = task.Clock()
self.task_handler = _TASK_HANDLER
self.task_handler.clear()
self.task = self.task_handler.add(self.timedelay, func_test_cmd_tasks)
task_args = self.task_handler.tasks.get(self.task.get_id(), False)
def tearDown(self):
super().tearDown()
self.task_handler.clear()
def test_no_tasks(self):
self.task_handler.clear()
self.call(system.CmdTasks(), '', 'There are no active tasks.')
def test_active_task(self):
cmd_result = self.call(system.CmdTasks(), '')
for ptrn in ('Task ID', 'Completion Date', 'Function', 'KWARGS', 'persisten',
'1', r'\d+/\d+/\d+', r'\d+\:\d+\:\d+', r'ms\:\d+', 'func_test', '{}',
'False'):
self.assertRegex(cmd_result, ptrn)
def test_persistent_task(self):
self.task_handler.clear()
self.task_handler.add(self.timedelay, func_test_cmd_tasks, persistent=True)
cmd_result = self.call(system.CmdTasks(), '')
self.assertRegex(cmd_result, 'True')
def test_pause_unpause(self):
# test pause
args = f'/pause {self.task.get_id()}'
wanted_msg = 'Yes or No, pause task 1? With completion date'
cmd_result = self.call(system.CmdTasks(), args, wanted_msg)
self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.')
self.char1.execute_cmd('y')
self.assertTrue(self.task.paused)
self.task_handler.clock.advance(self.timedelay + 1)
# test unpause
args = f'/unpause {self.task.get_id()}'
self.assertTrue(self.task.exists())
wanted_msg = 'Yes or No, unpause task 1? With completion date'
cmd_result = self.call(system.CmdTasks(), args, wanted_msg)
self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.')
self.char1.execute_cmd('y')
# verify task continues after unpause
self.task_handler.clock.advance(1)
self.assertFalse(self.task.exists())
def test_do_task(self):
args = f'/do_task {self.task.get_id()}'
wanted_msg = 'Yes or No, do_task task 1? With completion date'
cmd_result = self.call(system.CmdTasks(), args, wanted_msg)
self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.')
self.char1.execute_cmd('y')
self.assertFalse(self.task.exists())
def test_remove(self):
args = f'/remove {self.task.get_id()}'
wanted_msg = 'Yes or No, remove task 1? With completion date'
cmd_result = self.call(system.CmdTasks(), args, wanted_msg)
self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.')
self.char1.execute_cmd('y')
self.assertFalse(self.task.exists())
def test_call(self):
args = f'/call {self.task.get_id()}'
wanted_msg = 'Yes or No, call task 1? With completion date'
cmd_result = self.call(system.CmdTasks(), args, wanted_msg)
self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.')
self.char1.execute_cmd('y')
# make certain the task is still active
self.assertTrue(self.task.active())
# go past delay time, the task should call do_task and remove itself after calling.
self.task_handler.clock.advance(self.timedelay + 1)
self.assertFalse(self.task.exists())
def test_cancel(self):
args = f'/cancel {self.task.get_id()}'
wanted_msg = 'Yes or No, cancel task 1? With completion date'
cmd_result = self.call(system.CmdTasks(), args, wanted_msg)
self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.')
self.char1.execute_cmd('y')
self.assertTrue(self.task.exists())
self.assertFalse(self.task.active())
def test_func_name_manipulation(self):
self.task_handler.add(self.timedelay, func_test_cmd_tasks) # add an extra task
args = f'/remove func_test_cmd_tasks'
wanted_msg = 'Task action remove completed on task ID 1.|The task function remove returned: True|' \
'Task action remove completed on task ID 2.|The task function remove returned: True'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertFalse(self.task_handler.tasks) # no tasks should exist.
def test_wrong_func_name(self):
args = f'/remove intentional_fail'
wanted_msg = 'No tasks deferring function name intentional_fail found.'
self.call(system.CmdTasks(), args, wanted_msg)
self.assertTrue(self.task.active())
def test_no_input(self):
args = f'/cancel {self.task.get_id()}'
self.call(system.CmdTasks(), args)
# task should complete since no input was received
self.task_handler.clock.advance(self.timedelay + 1)
self.assertFalse(self.task.exists())
def test_responce_of_yes(self):
self.call(system.CmdTasks(), f'/cancel {self.task.get_id()}')
self.char1.msg = Mock()
self.char1.execute_cmd('y')
text = ''
for _, _, kwargs in self.char1.msg.mock_calls:
text += kwargs.get('text', '')
self.assertEqual(text, 'cancel request completed.The task function cancel returned: True')
self.assertTrue(self.task.exists())
def test_task_complete_waiting_input(self):
"""Test for task completing while waiting for input."""
self.call(system.CmdTasks(), f'/cancel {self.task.get_id()}')
self.task_handler.clock.advance(self.timedelay + 1)
self.char1.msg = Mock()
self.char1.execute_cmd('y')
text = ''
for _, _, kwargs in self.char1.msg.mock_calls:
text += kwargs.get('text', '')
self.assertEqual(text, 'Task completed while waiting for input.')
self.assertFalse(self.task.exists())
def test_new_task_waiting_input(self):
"""
Test task completing than a new task with the same ID being made while waitinf for input.
"""
self.assertTrue(self.task.get_id(), 1)
self.call(system.CmdTasks(), f'/cancel {self.task.get_id()}')
self.task_handler.clock.advance(self.timedelay + 1)
self.assertFalse(self.task.exists())
self.task = self.task_handler.add(self.timedelay, func_test_cmd_tasks)
self.assertTrue(self.task.get_id(), 1)
self.char1.msg = Mock()
self.char1.execute_cmd('y')
text = ''
for _, _, kwargs in self.char1.msg.mock_calls:
text += kwargs.get('text', '')
self.assertEqual(text, 'Task completed while waiting for input.')
def test_misformed_command(self):
wanted_msg = 'Task command misformed.|Proper format tasks[/switch] ' \
'[function name or task id]'
self.call(system.CmdTasks(), f'/cancel', wanted_msg)
class TestAdmin(CommandTest):
def test_emit(self):