From 93b68e3a312ba6f53d2ff9fa52fa91e74bfade16 Mon Sep 17 00:00:00 2001 From: davewiththenicehat <54369722+davewiththenicehat@users.noreply.github.com> Date: Mon, 14 Jun 2021 18:43:18 -0400 Subject: [PATCH] CmdTasks removed text tags added yes no --- evennia/commands/default/system.py | 183 +++++++++++++++++------------ evennia/commands/default/tests.py | 100 +++++++++++++--- 2 files changed, 192 insertions(+), 91 deletions(-) diff --git a/evennia/commands/default/system.py b/evennia/commands/default/system.py index 9bb1f05b40..88e8421152 100644 --- a/evennia/commands/default/system.py +++ b/evennia/commands/default/system.py @@ -24,6 +24,7 @@ from evennia.utils import logger, utils, gametime, create, search from evennia.utils.eveditor import EvEditor from evennia.utils.evtable import EvTable from evennia.utils.evmore import EvMore +from evennia.utils.evmenu import ask_yes_no from evennia.utils.utils import crop, class_from_module from evennia.scripts.taskhandler import TaskHandlerTask @@ -1217,7 +1218,7 @@ class CmdTasks(COMMAND_DEFAULT_CLASS): tasks[/switch] [function name] Process the action in the switch to all tasks that are deferring a specific function name. This would match the name of the callback you passed to a delay or the task handler. - tasks[/switch] [task id], [completion date], [function memory reference] + tasks[/switch] [task id] Process the action in the switch to a specific task. Switches: @@ -1237,17 +1238,16 @@ class CmdTasks(COMMAND_DEFAULT_CLASS): Only the action requested on the first switch will be processed. All other switches will be ignored. - Manipulation of a single task is intended to be done via the clickable links or through - code directly. Due to generally short life of a task, the inclusion of completion date - and function's memory reference guarentees an incorrect task will not be manipulated. - By default, tasks that are canceled and never called are automatically removed after one minute. Example: tasks/cancel move_callback - Cancel all movement delays from the slow_exit contrib. - In this example slow exits creates it's tasks with: utils.delay(move_delay, move_callback) + Cancel all movement delays from the slow_exit contrib. + In this example slow exits creates it's tasks with: + utils.delay(move_delay, move_callback) + tasks/cancel 2 + Cancel task id 2. """ @@ -1265,6 +1265,36 @@ class CmdTasks(COMMAND_DEFAULT_CLASS): t_func_mem_ref = t_func_name[3] if len(t_func_name) >= 4 else None return t_comp_date, t_func_mem_ref + def do_task_action(self, *args, **kwargs): + """ + Process the action of a tasks command. + + This exists to gain support with yes or no function from EvMenu. + """ + task_id = self.task_id + + # get a reference of the global task handler + global _TASK_HANDLER + if _TASK_HANDLER is None: + from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER + + # verify manipulating the correct task + task_args = _TASK_HANDLER.tasks.get(task_id, False) + if not task_args: # check if the task is still active + self.msg('Task completed while waiting for input.') + return + else: + # make certain a task with matching IDs has not been created + t_comp_date, t_func_mem_ref = self.coll_date_func(task_args) + if self.t_comp_date != t_comp_date or self.t_func_mem_ref != t_func_mem_ref: + self.msg('Task completed while waiting for input.') + return + + # Do the action requested by command caller + action_return = self.task_action() + self.msg(f'{self.action_request} request completed.') + self.msg(f'The task function {self.action_request} returned: {action_return}') + def func(self): # get a reference of the global task handler global _TASK_HANDLER @@ -1278,16 +1308,80 @@ class CmdTasks(COMMAND_DEFAULT_CLASS): return # handle caller's request to manipulate a task(s) - if self.switches or self.lhs: + if self.switches and self.lhs: + + # find if the argument is a task id or function name action_request = self.switches[0] - # handle caller requesting an action on specific deferred function - if len(self.lhslist) == 1: + try: + arg_is_id = int(self.lhslist[0]) + except ValueError: + arg_is_id = False + + # if the argument is a task id, proccess the action on a single task + if arg_is_id: + + err_arg_msg = 'Switch and task ID are required when manipulating a task.' + task_comp_msg = 'Task completed while processing request.' + + # handle missing arguments or switches + if not self.switches and self.lhs: + self.msg(err_arg_msg) + return + + # create a handle for the task + task_id = arg_is_id + task = TaskHandlerTask(task_id) + + # handle task no longer existing + if not task.exists(): + self.msg(f'Task {task_id} does not exist.') + return + + # get a reference of the function caller requested + switch_action = getattr(task, action_request, False) + if not switch_action: + self.msg(f'{self.switches[0]}, is not an acceptable task action or ' \ + f'{task_comp_msg.lower()}') + + # verify manipulating the correct task + if task_id in _TASK_HANDLER.tasks: + task_args = _TASK_HANDLER.tasks.get(task_id, False) + if not task_args: # check if the task is still active + self.msg(task_comp_msg) + return + else: + t_comp_date, t_func_mem_ref = self.coll_date_func(task_args) + t_func_name = str(task_args[1]).split(' ') + t_func_name = t_func_name[1] if len(t_func_name) >= 2 else None + + if task.exists(): # make certain the task has not been called yet. + prompt = f'Yes or No, {action_request} task {task_id}? With completion date ' \ + f'{t_comp_date}. Deferring function {t_func_name}.' + no_msg = f'No {action_request} processed.' + # record variables for use in do_task_action method + self.task_id = task_id + self.t_comp_date = t_comp_date + self.t_func_mem_ref = t_func_mem_ref + self.task_action = switch_action + self.action_request = action_request + ask_yes_no(self.caller, prompt, self.do_task_action, no_msg, no_msg, True) + return True + else: + self.msg(task_comp_msg) + return + + # the argument is not a task id, process the action on all task deferring the function + # specified as an argument + else: + name_match_found = False arg_func_name = self.lhslist[0].lower() + # repack tasks into a new dictionary current_tasks = {} for task_id, task_args in _TASK_HANDLER.tasks.items(): current_tasks.update({task_id: task_args}) + # call requested action on all tasks with the function name for task_id, task_args in current_tasks.items(): t_func_name = str(task_args[1]).split(' ') @@ -1302,65 +1396,18 @@ class CmdTasks(COMMAND_DEFAULT_CLASS): action_return = switch_action() self.msg(f'Task action {action_request} completed on task ID {task_id}.') self.msg(f'The task function {action_request} returned: {action_return}') + # provide a message if not tasks of the function name was found if not name_match_found: self.msg(f'No tasks deferring function name {arg_func_name} found.') return return True - err_arg_msg = 'Task ID, completion date and memory reference are required when ' \ - 'manipulating a delay.' - task_comp_msg = 'Task completed while processing request.' - # handle missing arguments or switches - if not self.switches and self.lhs: - self.msg(err_arg_msg) - return - # handle incorrect arguments - if len(self.lhslist) < 2: - self.msg(err_arg_msg) - return - # create a handle for the task - task_id = int(self.lhslist[0]) - task = TaskHandlerTask(task_id) - # handle task no longer existing - if not task.exists(): - self.msg(f'Task {task_id} does not exist.') - return - # get a reference of the function caller requested - switch_action = getattr(task, action_request, False) - if not switch_action: - self.msg(f'{self.switches[0]}, is not an acceptable task action or ' \ - f'{task_comp_msg.lower()}') - # verify manipulating the correct task - if task_id in _TASK_HANDLER.tasks: - task_args = _TASK_HANDLER.tasks.get(task_id, False) - if task_args: # check if the task is still active - sw_comp_date = self.lhslist[1] - sw_func_mem_ref = self.lhslist[2] - t_comp_date, t_func_mem_ref = self.coll_date_func(task_args) - # handle completion date mismatch - if not t_comp_date == sw_comp_date: - self.msg('Task completion time does not match time passed.') - self.msg(task_comp_msg) - self.msg('Likely a new task with the same ID was created') - return - # handle function memory reference mismatch - if not t_func_mem_ref == sw_func_mem_ref: - self.msg("Memory reference for the task's function does not match argument") - self.msg(task_comp_msg) - self.msg('Likely a new task with the same ID was created') - return - else: # task no longer exists - self.msg(task_comp_msg) - return - if task.exists(): # make certain the task has not been called yet. - # call the task's method - action_return = switch_action() - self.msg(f'Task action {action_request} completed.') - self.msg(f'The task function {action_request} returned: {action_return}') - return True - else: - self.msg(task_comp_msg) - return + + # check if an maleformed request was created + elif self.switches or self.lhs: + self.msg('Task command misformed.') + self.msg('Proper format tasks[/switch] [function name or task id]') + return # No task manupilation requested, build a table of tasks and display it # get the width of screen in characters @@ -1382,14 +1429,6 @@ class CmdTasks(COMMAND_DEFAULT_CLASS): task_data = (task_id, t_comp_date, t_func_name, t_args, t_kwargs, t_pers) for i in range(len(tasks_header)): tasks_list[i].append(task_data[i]) - # add task actions to the tasks list - actions = ('pause', 'unpause', 'do_task', 'remove', 'call', 'cancel') - for i in range(len(tasks_header)): - tasks_list[i].append(f"|lc{self.key}/{actions[i]} {task_id}, {t_comp_date}, " \ - f"{t_func_mem_ref}|lt{actions[i]}|le") - # if the screen width is large enough, add directional arrows - if width >= 75: - tasks_list[i][-1] = f"^{tasks_list[i][-1]}^" # create and display the table tasks_table = EvTable(*tasks_header, table=tasks_list, maxwidth=width, border='cells', align='center') diff --git a/evennia/commands/default/tests.py b/evennia/commands/default/tests.py index 815ac7ee26..f7d70d059f 100644 --- a/evennia/commands/default/tests.py +++ b/evennia/commands/default/tests.py @@ -594,7 +594,6 @@ class TestCmdTasks(CommandTest): self.task_handler.clear() self.task = self.task_handler.add(self.timedelay, func_test_cmd_tasks) task_args = self.task_handler.tasks.get(self.task.get_id(), False) - self.t_comp_date, self.t_func_mem_ref = system.CmdTasks.coll_date_func(task_args) def tearDown(self): @@ -620,36 +619,46 @@ class TestCmdTasks(CommandTest): def test_pause_unpause(self): # test pause - args = f'/pause {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}' - wanted_msg = 'Task action pause completed.|The task function pause returned:' - self.call(system.CmdTasks(), args, wanted_msg) + args = f'/pause {self.task.get_id()}' + wanted_msg = 'Yes or No, pause task 1? With completion date' + cmd_result = self.call(system.CmdTasks(), args, wanted_msg) + self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.') + self.char1.execute_cmd('y') self.assertTrue(self.task.paused) self.task_handler.clock.advance(self.timedelay + 1) # test unpause - args = f'/unpause {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}' + args = f'/unpause {self.task.get_id()}' self.assertTrue(self.task.exists()) - wanted_msg = 'Task action unpause completed.|The task function unpause returned: None' - self.call(system.CmdTasks(), args, wanted_msg) + wanted_msg = 'Yes or No, unpause task 1? With completion date' + cmd_result = self.call(system.CmdTasks(), args, wanted_msg) + self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.') + self.char1.execute_cmd('y') # verify task continues after unpause self.task_handler.clock.advance(1) self.assertFalse(self.task.exists()) def test_do_task(self): - args = f'/do_task {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}' - wanted_msg = 'Task action do_task completed.|The task function do_task returned: success' - self.call(system.CmdTasks(), args, wanted_msg) + args = f'/do_task {self.task.get_id()}' + wanted_msg = 'Yes or No, do_task task 1? With completion date' + cmd_result = self.call(system.CmdTasks(), args, wanted_msg) + self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.') + self.char1.execute_cmd('y') self.assertFalse(self.task.exists()) def test_remove(self): - args = f'/remove {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}' - wanted_msg = 'Task action remove completed.|The task function remove returned: True' - self.call(system.CmdTasks(), args, wanted_msg) + args = f'/remove {self.task.get_id()}' + wanted_msg = 'Yes or No, remove task 1? With completion date' + cmd_result = self.call(system.CmdTasks(), args, wanted_msg) + self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.') + self.char1.execute_cmd('y') self.assertFalse(self.task.exists()) def test_call(self): - args = f'/call {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}' - wanted_msg = 'Task action call completed.|The task function call returned: success' - self.call(system.CmdTasks(), args, wanted_msg) + args = f'/call {self.task.get_id()}' + wanted_msg = 'Yes or No, call task 1? With completion date' + cmd_result = self.call(system.CmdTasks(), args, wanted_msg) + self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.') + self.char1.execute_cmd('y') # make certain the task is still active self.assertTrue(self.task.active()) # go past delay time, the task should call do_task and remove itself after calling. @@ -657,9 +666,11 @@ class TestCmdTasks(CommandTest): self.assertFalse(self.task.exists()) def test_cancel(self): - args = f'/cancel {self.task.get_id()}, {self.t_comp_date}, {self.t_func_mem_ref}' - wanted_msg = 'Task action cancel completed.|The task function cancel returned: True' - self.call(system.CmdTasks(), args, wanted_msg) + args = f'/cancel {self.task.get_id()}' + wanted_msg = 'Yes or No, cancel task 1? With completion date' + cmd_result = self.call(system.CmdTasks(), args, wanted_msg) + self.assertRegex(cmd_result, '\. Deferring function func_test_cmd_tasks\.') + self.char1.execute_cmd('y') self.assertTrue(self.task.exists()) self.assertFalse(self.task.active()) @@ -677,6 +688,57 @@ class TestCmdTasks(CommandTest): self.call(system.CmdTasks(), args, wanted_msg) self.assertTrue(self.task.active()) + def test_no_input(self): + args = f'/cancel {self.task.get_id()}' + self.call(system.CmdTasks(), args) + # task should complete since no input was received + self.task_handler.clock.advance(self.timedelay + 1) + self.assertFalse(self.task.exists()) + + def test_responce_of_yes(self): + self.call(system.CmdTasks(), f'/cancel {self.task.get_id()}') + self.char1.msg = Mock() + self.char1.execute_cmd('y') + text = '' + for _, _, kwargs in self.char1.msg.mock_calls: + text += kwargs.get('text', '') + self.assertEqual(text, 'cancel request completed.The task function cancel returned: True') + self.assertTrue(self.task.exists()) + + def test_task_complete_waiting_input(self): + """Test for task completing while waiting for input.""" + self.call(system.CmdTasks(), f'/cancel {self.task.get_id()}') + self.task_handler.clock.advance(self.timedelay + 1) + self.char1.msg = Mock() + self.char1.execute_cmd('y') + text = '' + for _, _, kwargs in self.char1.msg.mock_calls: + text += kwargs.get('text', '') + self.assertEqual(text, 'Task completed while waiting for input.') + self.assertFalse(self.task.exists()) + + def test_new_task_waiting_input(self): + """ + Test task completing than a new task with the same ID being made while waitinf for input. + """ + self.assertTrue(self.task.get_id(), 1) + self.call(system.CmdTasks(), f'/cancel {self.task.get_id()}') + self.task_handler.clock.advance(self.timedelay + 1) + self.assertFalse(self.task.exists()) + self.task = self.task_handler.add(self.timedelay, func_test_cmd_tasks) + self.assertTrue(self.task.get_id(), 1) + self.char1.msg = Mock() + self.char1.execute_cmd('y') + text = '' + for _, _, kwargs in self.char1.msg.mock_calls: + text += kwargs.get('text', '') + self.assertEqual(text, 'Task completed while waiting for input.') + + def test_misformed_command(self): + wanted_msg = 'Task command misformed.|Proper format tasks[/switch] ' \ + '[function name or task id]' + self.call(system.CmdTasks(), f'/cancel', wanted_msg) + class TestAdmin(CommandTest): def test_emit(self):