[feat] Script refactor; decouple timer component from script lifetime. Resolve #1715

This commit is contained in:
Griatch 2021-03-07 10:34:01 +01:00
parent cd579fb649
commit b5195a6e96
29 changed files with 1136 additions and 1266 deletions

View file

@ -31,6 +31,13 @@
- Fix typo in UnixCommand contrib, where `help` was given as `--hel`.
- Latin (la) i18n translation (jamalainm)
- Made the `evennia` dir possible to use without gamedir for purpose of doc generation.
- Make Scripts' timer component independent from script object deletion; can now start/stop
timer without deleting Script. The `.persistent` flag now only controls if timer survives
reload - Script has to be removed with `.delete()` like other typeclassed entities.
- Add `utils.repeat` and `utils.unrepeat` as shortcuts to TickerHandler add/remove, similar
to how `utils.delay` is a shortcut for TaskHandler add.
- Refactor the classic `red_button` example to use `utils.delay/repeat` and modern recommended
code style and paradigms instead of relying on `Scripts` for everything.
### Evennia 0.9.5 (2019-2020)

View file

@ -318,8 +318,6 @@ class DefaultAccount(AccountDB, metaclass=TypeclassBase):
obj.account = self
session.puid = obj.id
session.puppet = obj
# validate/start persistent scripts on object
obj.scripts.validate()
# re-cache locks to make sure superuser bypass is updated
obj.locks.cache_lock_bypass(obj)

View file

@ -608,11 +608,6 @@ def cmdhandler(
# since this may be different for every command when
# merging multuple cmdsets
if hasattr(cmd, "obj") and hasattr(cmd.obj, "scripts"):
# cmd.obj is automatically made available by the cmdhandler.
# we make sure to validate its scripts.
yield cmd.obj.scripts.validate()
if _testing:
# only return the command instance
returnValue(cmd)

View file

@ -421,7 +421,7 @@ class CmdSetHandler(object):
self.mergetype_stack.append(new_current.actual_mergetype)
self.current = new_current
def add(self, cmdset, emit_to_obj=None, permanent=False, default_cmdset=False):
def add(self, cmdset, emit_to_obj=None, persistent=True, permanent=True, default_cmdset=False, **kwargs):
"""
Add a cmdset to the handler, on top of the old ones, unless it
is set as the default one (it will then end up at the bottom of the stack)
@ -430,7 +430,9 @@ class CmdSetHandler(object):
cmdset (CmdSet or str): Can be a cmdset object or the python path
to such an object.
emit_to_obj (Object, optional): An object to receive error messages.
permanent (bool, optional): This cmdset will remain across a server reboot.
persistent (bool, optional): Let cmdset remain across server reload.
permanent (bool, optional): DEPRECATED. This has the same use as
`persistent`.
default_cmdset (Cmdset, optional): Insert this to replace the
default cmdset position (there is only one such position,
always at the bottom of the stack).
@ -447,6 +449,12 @@ class CmdSetHandler(object):
it's a 'quirk' that has to be documented.
"""
if "permanent" in kwargs:
logger.log_dep("obj.cmdset.add() kwarg 'permanent' has changed to "
"'persistent' and now defaults to True.")
permanent = persistent or permanent
if not (isinstance(cmdset, str) or utils.inherits_from(cmdset, CmdSet)):
string = _("Only CmdSets can be added to the cmdsethandler!")
raise Exception(string)

View file

@ -183,7 +183,7 @@ def purge_processor(caller):
# something went wrong. Purge cmdset except default
caller.cmdset.clear()
caller.scripts.validate() # this will purge interactive mode
# caller.scripts.validate() # this will purge interactive mode
# -------------------------------------------------------------

View file

@ -3120,7 +3120,6 @@ class CmdScript(COMMAND_DEFAULT_CLASS):
% (script.get_display_name(caller), obj.get_display_name(caller))
)
script.stop()
obj.scripts.validate()
else: # rhs exists
if not self.switches:
# adding a new script, and starting it

View file

@ -436,7 +436,6 @@ class ScriptEvMore(EvMore):
"|wintval|n",
"|wnext|n",
"|wrept|n",
"|wdb",
"|wtypeclass|n",
"|wdesc|n",
align="r",
@ -448,9 +447,10 @@ class ScriptEvMore(EvMore):
nextrep = script.time_until_next_repeat()
if nextrep is None:
nextrep = "PAUSED" if script.db._paused_time else "--"
nextrep = script.db._paused_time
nextrep = f"PAUSED {int(nextrep)}s" if nextrep else "--"
else:
nextrep = "%ss" % nextrep
nextrep = f"{nextrep}s"
maxrepeat = script.repeats
remaining = script.remaining_repeats() or 0
@ -468,7 +468,6 @@ class ScriptEvMore(EvMore):
script.interval if script.interval > 0 else "--",
nextrep,
rept,
"*" if script.persistent else "-",
script.typeclass_path.rsplit(".", 1)[-1],
crop(script.desc, width=20),
)
@ -487,7 +486,6 @@ class CmdScripts(COMMAND_DEFAULT_CLASS):
start - start a script (must supply a script path)
stop - stops an existing script
kill - kills a script - without running its cleanup hooks
validate - run a validation on the script(s)
If no switches are given, this command just views all active
scripts. The argument can be either an object, at which point it
@ -500,7 +498,7 @@ class CmdScripts(COMMAND_DEFAULT_CLASS):
key = "scripts"
aliases = ["globalscript", "listscripts"]
switch_options = ("start", "stop", "kill", "validate")
switch_options = ("start", "stop", "kill")
locks = "cmd:perm(listscripts) or perm(Admin)"
help_category = "System"
@ -558,18 +556,11 @@ class CmdScripts(COMMAND_DEFAULT_CLASS):
scripts[0].stop()
# import pdb # DEBUG
# pdb.set_trace() # DEBUG
ScriptDB.objects.validate() # just to be sure all is synced
caller.msg(string)
else:
# multiple matches.
ScriptEvMore(caller, scripts, session=self.session)
caller.msg("Multiple script matches. Please refine your search")
elif self.switches and self.switches[0] in ("validate", "valid", "val"):
# run validation on all found scripts
nr_started, nr_stopped = ScriptDB.objects.validate(scripts=scripts)
string = "Validated %s scripts. " % ScriptDB.objects.all().count()
string += "Started %s and stopped %s scripts." % (nr_started, nr_stopped)
caller.msg(string)
else:
# No stopping or validation. We just want to view things.
ScriptEvMore(caller, scripts.order_by("id"), session=self.session)

View file

@ -1163,8 +1163,8 @@ class TestBuilding(CommandTest):
self.call(building.CmdScript(), "Obj ", "dbref ")
self.call(
building.CmdScript(), "/start Obj", "0 scripts started on Obj"
) # because it's already started
building.CmdScript(), "/start Obj", "1 scripts started on Obj"
) # we allow running start again; this should still happen
self.call(building.CmdScript(), "/stop Obj", "Stopping script")
self.call(
@ -1520,7 +1520,10 @@ class TestComms(CommandTest):
class TestBatchProcess(CommandTest):
def test_batch_commands(self):
@patch("evennia.contrib.tutorial_examples.red_button.repeat")
@patch("evennia.contrib.tutorial_examples.red_button.delay")
def test_batch_commands(self, mock_delay, mock_repeat):
# cannot test batchcode here, it must run inside the server process
self.call(
batchprocess.CmdBatchCommands(),
@ -1532,6 +1535,7 @@ class TestBatchProcess(CommandTest):
building.CmdDestroy.confirm = False
self.call(building.CmdDestroy(), "button", "button was destroyed.")
building.CmdDestroy.confirm = confirm
mock_repeat.assert_called()
class CmdInterrupt(Command):

View file

@ -47,8 +47,9 @@ class EventHandler(DefaultScript):
# Tasks
self.db.tasks = {}
self.at_server_start()
def at_start(self):
def at_server_start(self):
"""Set up the event system when starting.
Note that this hook is called every time the server restarts

View file

@ -48,7 +48,7 @@ class TestEventHandler(EvenniaTest):
"""Stop the event handler."""
OLD_EVENTS.clear()
OLD_EVENTS.update(self.handler.ndb.events)
self.handler.stop()
self.handler.delete()
CallbackHandler.script = None
super().tearDown()
@ -270,11 +270,11 @@ class TestCmdCallback(CommandTest):
"""Stop the callback handler."""
OLD_EVENTS.clear()
OLD_EVENTS.update(self.handler.ndb.events)
self.handler.stop()
self.handler.delete()
for script in ScriptDB.objects.filter(
db_typeclass_path="evennia.contrib.ingame_python.scripts.TimeEventScript"
):
script.stop()
script.delete()
CallbackHandler.script = None
super().tearDown()
@ -449,7 +449,7 @@ class TestDefaultCallbacks(CommandTest):
"""Stop the callback handler."""
OLD_EVENTS.clear()
OLD_EVENTS.update(self.handler.ndb.events)
self.handler.stop()
self.handler.delete()
CallbackHandler.script = None
super().tearDown()

View file

@ -58,7 +58,6 @@ def register_events(path_or_typeclass):
typeclass_name = typeclass.__module__ + "." + typeclass.__name__
try:
storage = ScriptDB.objects.get(db_key="event_handler")
assert storage.is_active
assert storage.ndb.events is not None
except (ScriptDB.DoesNotExist, AssertionError):
storage = EVENTS

View file

@ -1477,11 +1477,11 @@ class TestTurnBattleBasicFunc(EvenniaTest):
def tearDown(self):
super(TestTurnBattleBasicFunc, self).tearDown()
self.turnhandler.stop()
self.testroom.delete()
self.attacker.delete()
self.defender.delete()
self.joiner.delete()
self.testroom.delete()
self.turnhandler.stop()
# Test combat functions
def test_tbbasicfunc(self):
@ -1570,11 +1570,11 @@ class TestTurnBattleEquipFunc(EvenniaTest):
def tearDown(self):
super(TestTurnBattleEquipFunc, self).tearDown()
self.turnhandler.stop()
self.testroom.delete()
self.attacker.delete()
self.defender.delete()
self.joiner.delete()
self.testroom.delete()
self.turnhandler.stop()
# Test the combat functions in tb_equip too. They work mostly the same.
def test_tbequipfunc(self):
@ -1662,11 +1662,11 @@ class TestTurnBattleRangeFunc(EvenniaTest):
def tearDown(self):
super(TestTurnBattleRangeFunc, self).tearDown()
self.turnhandler.stop()
self.testroom.delete()
self.attacker.delete()
self.defender.delete()
self.joiner.delete()
self.testroom.delete()
self.turnhandler.stop()
# Test combat functions in tb_range too.
def test_tbrangefunc(self):
@ -1776,12 +1776,12 @@ class TestTurnBattleItemsFunc(EvenniaTest):
def tearDown(self):
super(TestTurnBattleItemsFunc, self).tearDown()
self.turnhandler.stop()
self.testroom.delete()
self.attacker.delete()
self.defender.delete()
self.joiner.delete()
self.user.delete()
self.testroom.delete()
self.turnhandler.stop()
# Test functions in tb_items.
def test_tbitemsfunc(self):
@ -1895,11 +1895,11 @@ class TestTurnBattleMagicFunc(EvenniaTest):
def tearDown(self):
super(TestTurnBattleMagicFunc, self).tearDown()
self.turnhandler.stop()
self.testroom.delete()
self.attacker.delete()
self.defender.delete()
self.joiner.delete()
self.testroom.delete()
self.turnhandler.stop()
# Test combat functions in tb_magic.
def test_tbbasicfunc(self):

View file

@ -180,10 +180,10 @@ def apply_damage(defender, damage):
def at_defeat(defeated):
"""
Announces the defeat of a fighter in combat.
Args:
defeated (obj): Fighter that's been defeated.
Notes:
All this does is announce a defeat message by default, but if you
want anything else to happen to defeated fighters (like putting them
@ -482,6 +482,7 @@ class TBBasicTurnHandler(DefaultScript):
if disengage_check: # All characters have disengaged
self.obj.msg_contents("All fighters have disengaged! Combat is over!")
self.stop() # Stop this script and end combat.
self.delete()
return
# Check to see if only one character is left standing. If so, end combat.
@ -497,6 +498,7 @@ class TBBasicTurnHandler(DefaultScript):
LastStanding = fighter # Pick the one fighter left with HP remaining
self.obj.msg_contents("Only %s remains! Combat is over!" % LastStanding)
self.stop() # Stop this script and end combat.
self.delete()
return
# Cycle to the next turn.

View file

@ -1,335 +0,0 @@
"""
This defines the cmdset for the red_button. Here we have defined
the commands and the cmdset in the same module, but if you
have many different commands to merge it is often better
to define the cmdset separately, picking and choosing from
among the available commands as to what should be included in the
cmdset - this way you can often re-use the commands too.
"""
import random
from evennia import Command, CmdSet
# Some simple commands for the red button
# ------------------------------------------------------------
# Commands defined on the red button
# ------------------------------------------------------------
class CmdNudge(Command):
"""
Try to nudge the button's lid
Usage:
nudge lid
This command will have you try to
push the lid of the button away.
"""
key = "nudge lid" # two-word command name!
aliases = ["nudge"]
locks = "cmd:all()"
def func(self):
"""
nudge the lid. Random chance of success to open it.
"""
rand = random.random()
if rand < 0.5:
self.caller.msg("You nudge at the lid. It seems stuck.")
elif rand < 0.7:
self.caller.msg("You move the lid back and forth. It won't budge.")
else:
self.caller.msg("You manage to get a nail under the lid.")
self.caller.execute_cmd("open lid")
class CmdPush(Command):
"""
Push the red button
Usage:
push button
"""
key = "push button"
aliases = ["push", "press button", "press"]
locks = "cmd:all()"
def func(self):
"""
Note that we choose to implement this with checking for
if the lid is open/closed. This is because this command
is likely to be tried regardless of the state of the lid.
An alternative would be to make two versions of this command
and tuck them into the cmdset linked to the Open and Closed
lid-state respectively.
"""
if self.obj.db.lid_open:
string = "You reach out to press the big red button ..."
string += "\n\nA BOOM! A bright light blinds you!"
string += "\nThe world goes dark ..."
self.caller.msg(string)
self.caller.location.msg_contents(
"%s presses the button. BOOM! %s is blinded by a flash!"
% (self.caller.name, self.caller.name),
exclude=self.caller,
)
# the button's method will handle all setup of scripts etc.
self.obj.press_button(self.caller)
else:
string = "You cannot push the button - there is a glass lid covering it."
self.caller.msg(string)
class CmdSmashGlass(Command):
"""
smash glass
Usage:
smash glass
Try to smash the glass of the button.
"""
key = "smash glass"
aliases = ["smash lid", "break lid", "smash"]
locks = "cmd:all()"
def func(self):
"""
The lid won't open, but there is a small chance
of causing the lamp to break.
"""
rand = random.random()
if rand < 0.2:
string = "You smash your hand against the glass"
string += " with all your might. The lid won't budge"
string += " but you cause quite the tremor through the button's mount."
string += "\nIt looks like the button's lamp stopped working for the time being."
self.obj.lamp_works = False
elif rand < 0.6:
string = "You hit the lid hard. It doesn't move an inch."
else:
string = "You place a well-aimed fist against the glass of the lid."
string += " Unfortunately all you get is a pain in your hand. Maybe"
string += " you should just try to open the lid instead?"
self.caller.msg(string)
self.caller.location.msg_contents(
"%s tries to smash the glass of the button." % (self.caller.name), exclude=self.caller
)
class CmdOpenLid(Command):
"""
open lid
Usage:
open lid
"""
key = "open lid"
aliases = ["open button", "open"]
locks = "cmd:all()"
def func(self):
"simply call the right function."
if self.obj.db.lid_locked:
self.caller.msg("This lid seems locked in place for the moment.")
return
string = "\nA ticking sound is heard, like a winding mechanism. Seems "
string += "the lid will soon close again."
self.caller.msg(string)
self.caller.location.msg_contents(
"%s opens the lid of the button." % (self.caller.name), exclude=self.caller
)
# add the relevant cmdsets to button
self.obj.cmdset.add(LidClosedCmdSet)
# call object method
self.obj.open_lid()
class CmdCloseLid(Command):
"""
close the lid
Usage:
close lid
Closes the lid of the red button.
"""
key = "close lid"
aliases = ["close"]
locks = "cmd:all()"
def func(self):
"Close the lid"
self.obj.close_lid()
# this will clean out scripts dependent on lid being open.
self.caller.msg("You close the button's lid. It clicks back into place.")
self.caller.location.msg_contents(
"%s closes the button's lid." % (self.caller.name), exclude=self.caller
)
class CmdBlindLook(Command):
"""
Looking around in darkness
Usage:
look <obj>
... not that there's much to see in the dark.
"""
key = "look"
aliases = ["l", "get", "examine", "ex", "feel", "listen"]
locks = "cmd:all()"
def func(self):
"This replaces all the senses when blinded."
# we decide what to reply based on which command was
# actually tried
if self.cmdstring == "get":
string = "You fumble around blindly without finding anything."
elif self.cmdstring == "examine":
string = "You try to examine your surroundings, but can't see a thing."
elif self.cmdstring == "listen":
string = "You are deafened by the boom."
elif self.cmdstring == "feel":
string = "You fumble around, hands outstretched. You bump your knee."
else:
# trying to look
string = "You are temporarily blinded by the flash. "
string += "Until it wears off, all you can do is feel around blindly."
self.caller.msg(string)
self.caller.location.msg_contents(
"%s stumbles around, blinded." % (self.caller.name), exclude=self.caller
)
class CmdBlindHelp(Command):
"""
Help function while in the blinded state
Usage:
help
"""
key = "help"
aliases = "h"
locks = "cmd:all()"
def func(self):
"Give a message."
self.caller.msg("You are beyond help ... until you can see again.")
# ---------------------------------------------------------------
# Command sets for the red button
# ---------------------------------------------------------------
# We next tuck these commands into their respective command sets.
# (note that we are overdoing the cdmset separation a bit here
# to show how it works).
class DefaultCmdSet(CmdSet):
"""
The default cmdset always sits
on the button object and whereas other
command sets may be added/merge onto it
and hide it, removing them will always
bring it back. It's added to the object
using obj.cmdset.add_default().
"""
key = "RedButtonDefault"
mergetype = "Union" # this is default, we don't really need to put it here.
def at_cmdset_creation(self):
"Init the cmdset"
self.add(CmdPush())
class LidClosedCmdSet(CmdSet):
"""
A simple cmdset tied to the redbutton object.
It contains the commands that launches the other
command sets, making the red button a self-contained
item (i.e. you don't have to manually add any
scripts etc to it when creating it).
"""
key = "LidClosedCmdSet"
# default Union is used *except* if we are adding to a
# cmdset named LidOpenCmdSet - this one we replace
# completely.
key_mergetype = {"LidOpenCmdSet": "Replace"}
def at_cmdset_creation(self):
"Populates the cmdset when it is instantiated."
self.add(CmdNudge())
self.add(CmdSmashGlass())
self.add(CmdOpenLid())
class LidOpenCmdSet(CmdSet):
"""
This is the opposite of the Closed cmdset.
"""
key = "LidOpenCmdSet"
# default Union is used *except* if we are adding to a
# cmdset named LidClosedCmdSet - this one we replace
# completely.
key_mergetype = {"LidClosedCmdSet": "Replace"}
def at_cmdset_creation(self):
"setup the cmdset (just one command)"
self.add(CmdCloseLid())
class BlindCmdSet(CmdSet):
"""
This is the cmdset added to the *account* when
the button is pushed.
"""
key = "BlindCmdSet"
# we want it to completely replace all normal commands
# until the timed script removes it again.
mergetype = "Replace"
# we want to stop the account from walking around
# in this blinded state, so we hide all exits too.
# (channel commands will still work).
no_exits = True # keep account in the same room
no_objs = True # don't allow object commands
def at_cmdset_creation(self):
"Setup the blind cmdset"
from evennia.commands.default.general import CmdSay
from evennia.commands.default.general import CmdPose
self.add(CmdSay())
self.add(CmdPose())
self.add(CmdBlindLook())
self.add(CmdBlindHelp())

View file

@ -9,11 +9,382 @@ Create this button with
@create/drop examples.red_button.RedButton
Note that you must drop the button before you can see its messages!
## Technical
The button's functionality is controlled by CmdSets that gets added and removed
depending on the 'state' the button is in. Since this is an example we have
gone a little overboard separating the states for clarity.
- The default state is the fallback state and is represented by a default cmdset with
the 'push' command. This is always available but acts differently depending on
what other state the button has:
- Lid-closed state: In this state the button is covered by a glass cover and trying
to 'push' it will fail. You can 'nudge', 'smash' or 'open' the lid.
- Lid-open state: In this state the lid is open but will close again after a certain
time. Using 'push' now will press the button and trigger the Blind-state.
- Blind-state: In this mode you are blinded by a bright flash. This will affect your
normal commands like 'look' and help until the blindness wears off after a certain
time.
Timers are handled by persistent delays on the button. These are examples of
`evennia.utils.utils.delay` calls that wait a certain time before calling a method -
such as when closing the lid and un-blinding a character.
"""
import random
from evennia import DefaultObject
from evennia.contrib.tutorial_examples import red_button_scripts as scriptexamples
from evennia.contrib.tutorial_examples import cmdset_red_button as cmdsetexamples
from evennia import Command, CmdSet
from evennia.utils.utils import delay, repeat, interactive
# Commands on the button (not all awailable at the same time)
# Commands for the state when the lid covering the button is closed.
class CmdPushLidClosed(Command):
"""
Push the red button (lid closed)
Usage:
push button
"""
key = "push button"
aliases = ["push", "press button", "press"]
locks = "cmd:all()"
def func(self):
"""
This is the version of push used when the lid is closed.
An alternative would be to make a 'push' command in a default cmdset
that is always available on the button and then use if-statements to
check if the lid is open or closed.
"""
self.caller.msg("You cannot push the button = there is a glass lid covering it.")
class CmdNudge(Command):
"""
Try to nudge the button's lid.
Usage:
nudge lid
This command will have you try to push the lid of the button away.
"""
key = "nudge lid" # two-word command name!
aliases = ["nudge"]
locks = "cmd:all()"
def func(self):
"""
Nudge the lid. Random chance of success to open it.
"""
rand = random.random()
if rand < 0.5:
self.caller.msg("You nudge at the lid. It seems stuck.")
elif rand < 0.7:
self.caller.msg("You move the lid back and forth. It won't budge.")
else:
self.caller.msg("You manage to get a nail under the lid.")
# self.obj is the button object
self.obj.to_open_state()
class CmdSmashGlass(Command):
"""
Smash the protective glass.
Usage:
smash glass
Try to smash the glass of the button.
"""
key = "smash glass"
aliases = ["smash lid", "break lid", "smash"]
locks = "cmd:all()"
def func(self):
"""
The lid won't open, but there is a small chance of causing the lamp to
break.
"""
rand = random.random()
self.caller.location.msg_contents(
f"{self.caller.name} tries to smash the glass of the button.",
exclude=self.caller)
if rand < 0.2:
string = ("You smash your hand against the glass"
" with all your might. The lid won't budge"
" but you cause quite the tremor through the button's mount."
"\nIt looks like the button's lamp stopped working for the time being, "
"but the lid is still as closed as ever.")
# self.obj is the button itself
self.obj.break_lamp()
elif rand < 0.6:
string = "You hit the lid hard. It doesn't move an inch."
else:
string = ("You place a well-aimed fist against the glass of the lid."
" Unfortunately all you get is a pain in your hand. Maybe"
" you should just try to just ... open the lid instead?")
self.caller.msg(string)
class CmdOpenLid(Command):
"""
open lid
Usage:
open lid
"""
key = "open lid"
aliases = ["open button"]
locks = "cmd:all()"
def func(self):
"simply call the right function."
if self.obj.db.lid_locked:
self.caller.msg("This lid seems locked in place for the moment.")
return
string = "\nA ticking sound is heard, like a winding mechanism. Seems "
string += "the lid will soon close again."
self.caller.msg(string)
self.caller.location.msg_contents(
f"{self.caller.name} opens the lid of the button.",
exclude=self.caller)
self.obj.to_open_state()
class LidClosedCmdSet(CmdSet):
"""
A simple cmdset tied to the redbutton object.
It contains the commands that launches the other
command sets, making the red button a self-contained
item (i.e. you don't have to manually add any
scripts etc to it when creating it).
Note that this is given with a `key_mergetype` set. This
is set up so that the cmdset with merge with Union merge type
*except* if the other cmdset to merge with is LidOpenCmdSet,
in which case it will Replace that. So these two cmdsets will
be mutually exclusive.
"""
key = "LidClosedCmdSet"
def at_cmdset_creation(self):
"Populates the cmdset when it is instantiated."
self.add(CmdPushLidClosed())
self.add(CmdNudge())
self.add(CmdSmashGlass())
self.add(CmdOpenLid())
# Commands for the state when the button's protective cover is open - now the
# push command will work. You can also close the lid again.
class CmdPushLidOpen(Command):
"""
Push the red button
Usage:
push button
"""
key = "push button"
aliases = ["push", "press button", "press"]
locks = "cmd:all()"
@interactive
def func(self):
"""
This version of push will immediately trigger the next button state.
The use of the @interactive decorator allows for using `yield` to add
simple pauses in how quickly a message is returned to the user. This
kind of pause will not survive a server reload.
"""
# pause a little between each message.
self.caller.msg("You reach out to press the big red button ...")
yield(2) # pause 2s before next message
self.caller.msg("\n\n|wBOOOOM! A bright light blinds you!|n")
yield(1) # pause 1s before next message
self.caller.msg("\n\n|xThe world goes dark ...|n")
name = self.caller.name
self.caller.location.msg_contents(
f"{name} presses the button. BOOM! {name} is blinded by a flash!",
exclude=self.caller)
self.obj.blind_target(self.caller)
class CmdCloseLid(Command):
"""
Close the lid
Usage:
close lid
Closes the lid of the red button.
"""
key = "close lid"
aliases = ["close"]
locks = "cmd:all()"
def func(self):
"Close the lid"
self.obj.to_closed_state()
# this will clean out scripts dependent on lid being open.
self.caller.msg("You close the button's lid. It clicks back into place.")
self.caller.location.msg_contents(
f"{self.caller.name} closes the button's lid.",
exclude=self.caller)
class LidOpenCmdSet(CmdSet):
"""
This is the opposite of the Closed cmdset.
Note that this is given with a `key_mergetype` set. This
is set up so that the cmdset with merge with Union merge type
*except* if the other cmdset to merge with is LidClosedCmdSet,
in which case it will Replace that. So these two cmdsets will
be mutually exclusive.
"""
key = "LidOpenCmdSet"
def at_cmdset_creation(self):
"""Setup the cmdset"""
self.add(CmdPushLidOpen())
self.add(CmdCloseLid())
# Commands for when the button has been pushed and the player is blinded. This
# replaces commands on the player making them 'blind' for a while.
class CmdBlindLook(Command):
"""
Looking around in darkness
Usage:
look <obj>
... not that there's much to see in the dark.
"""
key = "look"
aliases = ["l", "get", "examine", "ex", "feel", "listen"]
locks = "cmd:all()"
def func(self):
"This replaces all the senses when blinded."
# we decide what to reply based on which command was
# actually tried
if self.cmdstring == "get":
string = "You fumble around blindly without finding anything."
elif self.cmdstring == "examine":
string = "You try to examine your surroundings, but can't see a thing."
elif self.cmdstring == "listen":
string = "You are deafened by the boom."
elif self.cmdstring == "feel":
string = "You fumble around, hands outstretched. You bump your knee."
else:
# trying to look
string = ("You are temporarily blinded by the flash. "
"Until it wears off, all you can do is feel around blindly.")
self.caller.msg(string)
self.caller.location.msg_contents(
f"{self.caller.name} stumbles around, blinded.",
exclude=self.caller)
class CmdBlindHelp(Command):
"""
Help function while in the blinded state
Usage:
help
"""
key = "help"
aliases = "h"
locks = "cmd:all()"
def func(self):
"""
Just give a message while blinded. We could have added this to the
CmdBlindLook command too if we wanted to keep things more compact.
"""
self.caller.msg("You are beyond help ... until you can see again.")
class BlindCmdSet(CmdSet):
"""
This is the cmdset added to the *account* when
the button is pushed.
Since this has mergetype Replace it will completely remove the commands of
all other cmdsets while active. To allow some limited interaction
(pose/say) we import those default commands and add them too.
We also disable all exit-commands generated by exits and
object-interactions while blinded by setting `no_exits` and `no_objs` flags
on the cmdset. This is to avoid the player walking off or interfering with
other objects while blinded. Account-level commands however (channel messaging
etc) will not be affected by the blinding.
"""
key = "BlindCmdSet"
# we want it to completely replace all normal commands
# until the timed script removes it again.
mergetype = "Replace"
# we want to stop the player from walking around
# in this blinded state, so we hide all exits too.
# (channel commands will still work).
no_exits = True # keep player in the same room
no_objs = True # don't allow object commands
def at_cmdset_creation(self):
"Setup the blind cmdset"
from evennia.commands.default.general import CmdSay
from evennia.commands.default.general import CmdPose
self.add(CmdSay())
self.add(CmdPose())
self.add(CmdBlindLook())
self.add(CmdBlindHelp())
#
# Definition of the object itself
@ -22,146 +393,189 @@ from evennia.contrib.tutorial_examples import cmdset_red_button as cmdsetexample
class RedButton(DefaultObject):
"""
This class describes an evil red button. It will use the script
definition in contrib/examples/red_button_scripts to blink at regular
intervals. It also uses a series of script and commands to handle
pushing the button and causing effects when doing so.
This class describes an evil red button. It will blink invitingly and
temporarily blind whomever presses it.
The following attributes can be set on the button:
desc_lid_open - description when lid is open
desc_lid_closed - description when lid is closed
desc_lamp_broken - description when lamp is broken
The button can take a few optional attributes controlling how things will
be displayed in its various states. This is a useful way to give builders
the option to customize a complex object from in-game. Actual return messages
to event-actions are (in this example) left with each command, but one could
also imagine having those handled via Attributes as well, if one wanted a
completely in-game customizable button without needing to tweak command
classes.
Attributes:
- `desc_closed_lid`: This is the description to show of the button
when the lid is closed.
- `desc_open_lid`": Shown when the lid is open
- `auto_close_msg`: Message to show when lid auto-closes
- `desc_add_lamp_broken`: Extra desc-line added after normal desc when lamp
is broken.
- blink_msg: A list of strings to randomly choose from when the lamp
blinks.
Notes:
The button starts with lid closed. To set the initial description,
you can either set desc after creating it or pass a `desc` attribute
when creating it, such as
`button = create_object(RedButton, ..., attributes=[('desc', 'my desc')])`.
"""
# these are the pre-set descriptions. Setting attributes will override
# these on the fly.
desc_closed_lid = ("This is a large red button, inviting yet evil-looking. "
"A closed glass lid protects it.")
desc_open_lid = ("This is a large red button, inviting yet evil-looking. "
"Its glass cover is open and the button exposed.")
auto_close_msg = "The button's glass lid silently slides back in place."
lamp_breaks_msg = "The lamp flickers, the button going dark."
desc_add_lamp_broken = "\nThe big red button has stopped blinking for the time being."
# note that this is a list. A random message will display each time
blink_msgs = ["The red button flashes briefly.",
"The red button blinks invitingly.",
"The red button flashes. You know you wanna push it!"]
def at_object_creation(self):
"""
This function is called when object is created. Use this
instead of e.g. __init__.
"""
# store desc (default, you can change this at creation time)
desc = "This is a large red button, inviting yet evil-looking. "
desc += "A closed glass lid protects it."
self.db.desc = desc
This function is called (once) when object is created.
# We have to define all the variables the scripts
# are checking/using *before* adding the scripts or
# they might be deactivated before even starting!
self.db.lid_open = False
"""
self.db.lamp_works = True
self.db.lid_locked = False
self.cmdset.add_default(cmdsetexamples.DefaultCmdSet, permanent=True)
# start closed
self.to_closed_state()
# since the cmdsets relevant to the button are added 'on the fly',
# we need to setup custom scripts to do this for us (also, these scripts
# check so they are valid (i.e. the lid is actually still closed)).
# The AddClosedCmdSet script makes sure to add the Closed-cmdset.
self.scripts.add(scriptexamples.ClosedLidState)
# the script EventBlinkButton makes the button blink regularly.
self.scripts.add(scriptexamples.BlinkButtonEvent)
# start blinking every 35s.
repeat(35, self._do_blink, persistent=True)
# state-changing methods
def open_lid(self):
def _do_blink(self):
"""
Opens the glass lid and start the timer so it will soon close
again.
Have the button blink invitingly unless it's broken.
"""
if self.location and self.db.lamp_works:
possible_messages = self.db.blink_msgs or self.blink_msgs
self.location.msg_contents(random.choice(possible_messages))
if self.db.lid_open:
return
desc = self.db.desc_lid_open
if not desc:
desc = "This is a large red button, inviting yet evil-looking. "
desc += "Its glass cover is open and the button exposed."
self.db.desc = desc
self.db.lid_open = True
# with the lid open, we validate scripts; this will clean out
# scripts that depend on the lid to be closed.
self.scripts.validate()
# now add new scripts that define the open-lid state
self.scripts.add(scriptexamples.OpenLidState)
# we also add a scripted event that will close the lid after a while.
# (this one cleans itself after being called once)
self.scripts.add(scriptexamples.CloseLidEvent)
def close_lid(self):
def _set_desc(self, attrname=None):
"""
Close the glass lid. This validates all scripts on the button,
which means that scripts only being valid when the lid is open
will go away automatically.
"""
if not self.db.lid_open:
return
desc = self.db.desc_lid_closed
if not desc:
desc = "This is a large red button, inviting yet evil-looking. "
desc += "Its glass cover is closed, protecting it."
self.db.desc = desc
self.db.lid_open = False
# clean out scripts depending on lid to be open
self.scripts.validate()
# add scripts related to the closed state
self.scripts.add(scriptexamples.ClosedLidState)
def break_lamp(self, feedback=True):
"""
Breaks the lamp in the button, stopping it from blinking.
Set a description, based on the attrname given, taking the lamp-status
into account.
Args:
feedback (bool): Show a message about breaking the lamp.
attrname (str, optional): This will first check for an Attribute with this name,
secondly for a property on the class. So if `attrname="auto_close_msg"`,
we will first look for an attribute `.db.auto_close_msg` and if that's
not found we'll use `.auto_close_msg` instead. If unset (`None`), the
currently set desc will not be changed (only lamp will be checked).
Notes:
If `self.db.lamp_works` is `False`, we'll append
`desc_add_lamp_broken` text.
"""
if attrname:
# change desc
desc = self.attributes.get(attrname) or getattr(self, attrname)
else:
# use existing desc
desc = self.db.desc
if not self.db.lamp_works:
# lamp not working. Add extra to button's desc
desc += self.db.desc_add_lamp_broken or self.desc_add_lamp_broken
self.db.desc = desc
# state-changing methods and actions
def to_closed_state(self, msg=None):
"""
Switches the button to having its lid closed.
Args:
msg (str, optional): If given, display a message to the room
when lid closes.
This will first try to get the Attribute (self.db.desc_closed_lid) in
case it was set by a builder and if that was None, it will fall back to
self.desc_closed_lid, the default description (note that lack of .db).
"""
self._set_desc("desc_closed_lid")
# remove lidopen-state, if it exists
self.cmdset.remove(LidOpenCmdSet)
# add lid-closed cmdset
self.cmdset.add(LidClosedCmdSet)
if msg and self.location:
self.location.msg_contents(msg)
def to_open_state(self):
"""
Switches the button to having its lid open. This also starts a timer
that will eventually close it again.
"""
self._set_desc("desc_open_lid")
# remove lidopen-state, if it exists
self.cmdset.remove(LidClosedCmdSet)
# add lid-open cmdset
self.cmdset.add(LidOpenCmdSet)
# wait 20s then call self.to_closed_state with a message as argument
delay(35, self.to_closed_state,
self.db.auto_close_msg or self.auto_close_msg,
persistent=True)
def _unblind_target(self, caller):
"""
This is called to un-blind after a certain time.
"""
caller.cmdset.remove(BlindCmdSet)
caller.msg("You blink feverishly as your eyesight slowly returns.")
self.location.msg_contents(
f"{caller.name} seems to be recovering their eyesight, blinking feverishly.",
exclude=caller)
def blind_target(self, caller):
"""
Someone was foolish enough to press the button! Blind them
temporarily.
Args:
caller (Object): The one to be blinded.
"""
# we don't need to remove other cmdsets, this will replace all,
# then restore whatever was there when it goes away.
caller.cmdset.add(BlindCmdSet)
# wait 20s then call self._unblind to remove blindness effect. The
# persistent=True means the delay should survive a server reload.
delay(20, self._unblind_target, caller,
persistent=True)
def _unbreak_lamp(self):
"""
This is called to un-break the lamp after a certain time.
"""
# we do this quietly, the user will just notice it starting blinking again
self.db.lamp_works = True
self._set_desc()
def break_lamp(self):
"""
Breaks the lamp in the button, stopping it from blinking for a while
"""
self.db.lamp_works = False
desc = self.db.desc_lamp_broken
if not desc:
self.db.desc += "\nThe big red button has stopped blinking for the time being."
else:
self.db.desc = desc
# this will update the desc with the info about the broken lamp
self._set_desc()
self.location.msg_contents(self.db.lamp_breaks_msg or self.lamp_breaks_msg)
if feedback and self.location:
self.location.msg_contents("The lamp flickers, the button going dark.")
self.scripts.validate()
def press_button(self, pobject):
"""
Someone was foolish enough to press the button!
Args:
pobject (Object): The person pressing the button
"""
# deactivate the button so it won't flash/close lid etc.
self.scripts.add(scriptexamples.DeactivateButtonEvent)
# blind the person pressing the button. Note that this
# script is set on the *character* pressing the button!
pobject.scripts.add(scriptexamples.BlindedState)
# script-related methods
def blink(self):
"""
The script system will regularly call this
function to make the button blink. Now and then
it won't blink at all though, to add some randomness
to how often the message is echoed.
"""
loc = self.location
if loc:
rand = random.random()
if rand < 0.2:
string = "The red button flashes briefly."
elif rand < 0.4:
string = "The red button blinks invitingly."
elif rand < 0.6:
string = "The red button flashes. You know you wanna push it!"
else:
# no blink
return
loc.msg_contents(string)
# wait 21s before unbreaking the lamp again
delay(21, self._unbreak_lamp)

View file

@ -1,285 +0,0 @@
"""
Example of scripts.
These are scripts intended for a particular object - the
red_button object type in contrib/examples. A few variations
on uses of scripts are included.
"""
from evennia import DefaultScript
from evennia.contrib.tutorial_examples import cmdset_red_button as cmdsetexamples
#
# Scripts as state-managers
#
# Scripts have many uses, one of which is to statically
# make changes when a particular state of an object changes.
# There is no "timer" involved in this case (although there could be),
# whenever the script determines it is "invalid", it simply shuts down
# along with all the things it controls.
#
# To show as many features as possible of the script and cmdset systems,
# we will use three scripts controlling one state each of the red_button,
# each with its own set of commands, handled by cmdsets - one for when
# the button has its lid open, and one for when it is closed and a
# last one for when the player pushed the button and gets blinded by
# a bright light. The last one also has a timer component that allows it
# to remove itself after a while (and the player recovers their eyesight).
class ClosedLidState(DefaultScript):
"""
This manages the cmdset for the "closed" button state. What this
means is that while this script is valid, we add the RedButtonClosed
cmdset to it (with commands like open, nudge lid etc)
"""
def at_script_creation(self):
"Called when script first created."
self.key = "closed_lid_script"
self.desc = "Script that manages the closed-state cmdsets for red button."
self.persistent = True
def at_start(self):
"""
This is called once every server restart, so we want to add the
(memory-resident) cmdset to the object here. is_valid is automatically
checked so we don't need to worry about adding the script to an
open lid.
"""
# All we do is add the cmdset for the closed state.
self.obj.cmdset.add(cmdsetexamples.LidClosedCmdSet)
def is_valid(self):
"""
The script is only valid while the lid is closed.
self.obj is the red_button on which this script is defined.
"""
return not self.obj.db.lid_open
def at_stop(self):
"""
When the script stops we must make sure to clean up after us.
"""
self.obj.cmdset.delete(cmdsetexamples.LidClosedCmdSet)
class OpenLidState(DefaultScript):
"""
This manages the cmdset for the "open" button state. This will add
the RedButtonOpen
"""
def at_script_creation(self):
"Called when script first created."
self.key = "open_lid_script"
self.desc = "Script that manages the opened-state cmdsets for red button."
self.persistent = True
def at_start(self):
"""
This is called once every server restart, so we want to add the
(memory-resident) cmdset to the object here. is_valid is
automatically checked, so we don't need to worry about
adding the cmdset to a closed lid-button.
"""
self.obj.cmdset.add(cmdsetexamples.LidOpenCmdSet)
def is_valid(self):
"""
The script is only valid while the lid is open.
self.obj is the red_button on which this script is defined.
"""
return self.obj.db.lid_open
def at_stop(self):
"""
When the script stops (like if the lid is closed again)
we must make sure to clean up after us.
"""
self.obj.cmdset.delete(cmdsetexamples.LidOpenCmdSet)
class BlindedState(DefaultScript):
"""
This is a timed state.
This adds a (very limited) cmdset TO THE ACCOUNT, during a certain time,
after which the script will close and all functions are
restored. It's up to the function starting the script to actually
set it on the right account object.
"""
def at_script_creation(self):
"""
We set up the script here.
"""
self.key = "temporary_blinder"
self.desc = "Temporarily blinds the account for a little while."
self.interval = 20 # seconds
self.start_delay = True # we don't want it to stop until after 20s.
self.repeats = 1 # this will go away after interval seconds.
self.persistent = False # we will ditch this if server goes down
def at_start(self):
"""
We want to add the cmdset to the linked object.
Note that the RedButtonBlind cmdset is defined to completly
replace the other cmdsets on the stack while it is active
(this means that while blinded, only operations in this cmdset
will be possible for the account to perform). It is however
not persistent, so should there be a bug in it, we just need
to restart the server to clear out of it during development.
"""
self.obj.cmdset.add(cmdsetexamples.BlindCmdSet)
def at_stop(self):
"""
It's important that we clear out that blinded cmdset
when we are done!
"""
self.obj.msg("You blink feverishly as your eyesight slowly returns.")
self.obj.location.msg_contents(
"%s seems to be recovering their eyesight." % self.obj.name, exclude=self.obj
)
self.obj.cmdset.delete() # this will clear the latest added cmdset,
# (which is the blinded one).
#
# Timer/Event-like Scripts
#
# Scripts can also work like timers, or "events". Below we
# define three such timed events that makes the button a little
# more "alive" - one that makes the button blink menacingly, another
# that makes the lid covering the button slide back after a while.
#
class CloseLidEvent(DefaultScript):
"""
This event closes the glass lid over the button
some time after it was opened. It's a one-off
script that should be started/created when the
lid is opened.
"""
def at_script_creation(self):
"""
Called when script object is first created. Sets things up.
We want to have a lid on the button that the user can pull
aside in order to make the button 'pressable'. But after a set
time that lid should auto-close again, making the button safe
from pressing (and deleting this command).
"""
self.key = "lid_closer"
self.desc = "Closes lid on a red buttons"
self.interval = 20 # seconds
self.start_delay = True # we want to pospone the launch.
self.repeats = 1 # we only close the lid once
self.persistent = True # even if the server crashes in those 20 seconds,
# the lid will still close once the game restarts.
def is_valid(self):
"""
This script can only operate if the lid is open; if it
is already closed, the script is clearly invalid.
Note that we are here relying on an self.obj being
defined (and being a RedButton object) - this we should be able to
expect since this type of script is always tied to one individual
red button object and not having it would be an error.
"""
return self.obj.db.lid_open
def at_repeat(self):
"""
Called after self.interval seconds. It closes the lid. Before this method is
called, self.is_valid() is automatically checked, so there is no need to
check this manually.
"""
self.obj.close_lid()
class BlinkButtonEvent(DefaultScript):
"""
This timed script lets the button flash at regular intervals.
"""
def at_script_creation(self):
"""
Sets things up. We want the button's lamp to blink at
regular intervals, unless it's broken (can happen
if you try to smash the glass, say).
"""
self.key = "blink_button"
self.desc = "Blinks red buttons"
self.interval = 35 # seconds
self.start_delay = False # blink right away
self.persistent = True # keep blinking also after server reboot
def is_valid(self):
"""
Button will keep blinking unless it is broken.
"""
return self.obj.db.lamp_works
def at_repeat(self):
"""
Called every self.interval seconds. Makes the lamp in
the button blink.
"""
self.obj.blink()
class DeactivateButtonEvent(DefaultScript):
"""
This deactivates the button for a short while (it won't blink, won't
close its lid etc). It is meant to be called when the button is pushed
and run as long as the blinded effect lasts. We cannot put these methods
in the AddBlindedCmdSet script since that script is defined on the *account*
whereas this one must be defined on the *button*.
"""
def at_script_creation(self):
"""
Sets things up.
"""
self.key = "deactivate_button"
self.desc = "Deactivate red button temporarily"
self.interval = 21 # seconds
self.start_delay = True # wait with the first repeat for self.interval seconds.
self.persistent = True
self.repeats = 1 # only do this once
def at_start(self):
"""
Deactivate the button. Observe that this method is always
called directly, regardless of the value of self.start_delay
(that just controls when at_repeat() is called)
"""
# closing the lid will also add the ClosedState script
self.obj.close_lid()
# lock the lid so other accounts can't access it until the
# first one's effect has worn off.
self.obj.db.lid_locked = True
# breaking the lamp also sets a correct desc
self.obj.break_lamp(feedback=False)
def at_repeat(self):
"""
When this is called, reset the functionality of the button.
"""
# restore button's desc.
self.obj.db.lamp_works = True
desc = "This is a large red button, inviting yet evil-looking. "
desc += "Its glass cover is closed, protecting it."
self.db.desc = desc
# re-activate the blink button event.
self.obj.scripts.add(BlinkButtonEvent)
# unlock the lid
self.obj.db.lid_locked = False
self.obj.scripts.validate()

View file

@ -1117,7 +1117,7 @@ class DefaultObject(ObjectDB, metaclass=TypeclassBase):
self.account = None
for script in _ScriptDB.objects.get_all_scripts_on_obj(self):
script.stop()
script.delete()
# Destroy any exits to and from this room, if any
self.clear_exits()

View file

@ -104,112 +104,22 @@ class ScriptDBManager(TypedObjectManager):
scripts = self.get_id(dbref)
for script in make_iter(scripts):
script.stop()
def remove_non_persistent(self, obj=None):
"""
This cleans up the script database of all non-persistent
scripts. It is called every time the server restarts.
Args:
obj (Object, optional): Only remove non-persistent scripts
assigned to this object.
"""
if obj:
to_stop = self.filter(db_obj=obj, db_persistent=False, db_is_active=True)
to_delete = self.filter(db_obj=obj, db_persistent=False, db_is_active=False)
else:
to_stop = self.filter(db_persistent=False, db_is_active=True)
to_delete = self.filter(db_persistent=False, db_is_active=False)
nr_deleted = to_stop.count() + to_delete.count()
for script in to_stop:
script.stop()
for script in to_delete:
script.delete()
return nr_deleted
def validate(self, scripts=None, obj=None, key=None, dbref=None, init_mode=None):
def update_scripts_after_server_start(self):
"""
This will step through the script database and make sure
all objects run scripts that are still valid in the context
they are in. This is called by the game engine at regular
intervals but can also be initiated by player scripts.
Only one of the arguments are supposed to be supplied
at a time, since they are exclusive to each other.
Args:
scripts (list, optional): A list of script objects to
validate.
obj (Object, optional): Validate only scripts defined on
this object.
key (str): Validate only scripts with this key.
dbref (int): Validate only the single script with this
particular id.
init_mode (str, optional): This is used during server
upstart and can have three values:
- `None` (no init mode). Called during run.
- `"reset"` - server reboot. Kill non-persistent scripts
- `"reload"` - server reload. Keep non-persistent scripts.
Returns:
nr_started, nr_stopped (tuple): Statistics on how many objects
where started and stopped.
Notes:
This method also makes sure start any scripts it validates
which should be harmless, since already-active scripts have
the property 'is_running' set and will be skipped.
Update/sync/restart/delete scripts after server shutdown/restart.
"""
for script in self.filter(db_is_active=True, db_persistent=False):
script._stop_task()
# we store a variable that tracks if we are calling a
# validation from within another validation (avoids
# loops).
for script in self.filter(db_is_active=True):
script._unpause_task(auto_unpause=True)
script.at_server_start()
global VALIDATE_ITERATION
if VALIDATE_ITERATION > 0:
# we are in a nested validation. Exit.
VALIDATE_ITERATION -= 1
return None, None
VALIDATE_ITERATION += 1
# not in a validation - loop. Validate as normal.
nr_started = 0
nr_stopped = 0
if init_mode:
if init_mode == "reset":
# special mode when server starts or object logs in.
# This deletes all non-persistent scripts from database
nr_stopped += self.remove_non_persistent(obj=obj)
# turn off the activity flag for all remaining scripts
scripts = self.get_all_scripts()
for script in scripts:
script.is_active = False
elif not scripts:
# normal operation
if dbref and self.dbref(dbref, reqhash=False):
scripts = self.get_id(dbref)
elif obj:
scripts = self.get_all_scripts_on_obj(obj, key=key)
else:
scripts = self.get_all_scripts(key=key)
if not scripts:
# no scripts available to validate
VALIDATE_ITERATION -= 1
return None, None
for script in scripts:
if script.is_valid():
nr_started += script.start(force_restart=init_mode)
else:
script.stop()
nr_stopped += 1
VALIDATE_ITERATION -= 1
return nr_started, nr_stopped
for script in self.filter(db_is_active=False):
script.at_server_start()
def search_script(self, ostring, obj=None, only_timed=False, typeclass=None):
"""

View file

@ -101,7 +101,7 @@ class ScriptDB(TypedObject):
# how often to run Script (secs). -1 means there is no timer
db_interval = models.IntegerField(
"interval", default=-1, help_text="how often to repeat script, in seconds. -1 means off."
"interval", default=-1, help_text="how often to repeat script, in seconds. <= 0 means off."
)
# start script right away or wait interval seconds first
db_start_delay = models.BooleanField(
@ -110,7 +110,7 @@ class ScriptDB(TypedObject):
# how many times this script is to be repeated, if interval!=0.
db_repeats = models.IntegerField("number of repeats", default=0, help_text="0 means off.")
# defines if this script should survive a reboot or not
db_persistent = models.BooleanField("survive server reboot", default=False)
db_persistent = models.BooleanField("survive server reboot", default=True)
# defines if this script has already been started in this session
db_is_active = models.BooleanField("script active", default=False)

View file

@ -108,7 +108,8 @@ class ScriptHandler(object):
scripts = ScriptDB.objects.get_all_scripts_on_obj(self.obj, key=key)
num = 0
for script in scripts:
num += script.start()
script.start()
num += 1
return num
def get(self, key):
@ -143,7 +144,8 @@ class ScriptHandler(object):
]
num = 0
for script in delscripts:
num += script.stop()
script.delete()
num += 1
return num
# alias to delete
@ -155,18 +157,3 @@ class ScriptHandler(object):
"""
return ScriptDB.objects.get_all_scripts_on_obj(self.obj)
def validate(self, init_mode=False):
"""
Runs a validation on this object's scripts only. This should
be called regularly to crank the wheels.
Args:
init_mode (str, optional): - This is used during server
upstart and can have three values:
- `False` (no init mode). Called during run.
- `"reset"` - server reboot. Kill non-persistent scripts
- `"reload"` - server reload. Keep non-persistent scripts.
"""
ScriptDB.objects.validate(obj=self.obj, init_mode=init_mode)

View file

@ -7,7 +7,6 @@ ability to run timers.
from twisted.internet.defer import Deferred, maybeDeferred
from twisted.internet.task import LoopingCall
from django.core.exceptions import ObjectDoesNotExist
from django.utils.translation import gettext as _
from evennia.typeclasses.models import TypeclassBase
from evennia.scripts.models import ScriptDB
@ -17,21 +16,11 @@ from evennia.utils import create, logger
__all__ = ["DefaultScript", "DoNothing", "Store"]
FLUSHING_INSTANCES = False # whether we're in the process of flushing scripts from the cache
SCRIPT_FLUSH_TIMERS = {} # stores timers for scripts that are currently being flushed
def restart_scripts_after_flush():
"""After instances are flushed, validate scripts so they're not dead for a long period of time"""
global FLUSHING_INSTANCES
ScriptDB.objects.validate()
FLUSHING_INSTANCES = False
class ExtendedLoopingCall(LoopingCall):
"""
LoopingCall that can start at a delay different
than `self.interval`.
Custom child of LoopingCall that can start at a delay different than
`self.interval` and self.count=0. This allows it to support pausing
by resuming at a later period.
"""
@ -49,10 +38,10 @@ class ExtendedLoopingCall(LoopingCall):
interval (int): Repeat interval in seconds.
now (bool, optional): Whether to start immediately or after
`start_delay` seconds.
start_delay (int): The number of seconds before starting.
If None, wait interval seconds. Only valid if `now` is `False`.
It is used as a way to start with a variable start time
after a pause.
start_delay (int, optional): This only applies is `now=False`. It gives
number of seconds to wait before starting. If `None`, use
`interval` as this value instead. Internally, this is used as a
way to start with a variable start time after a pause.
count_start (int): Number of repeats to start at. The count
goes up every time the system repeats. This is used to
implement something repeating `N` number of times etc.
@ -131,7 +120,7 @@ class ExtendedLoopingCall(LoopingCall):
of start_delay into account.
Returns:
next (int or None): The time in seconds until the next call. This
int or None: The time in seconds until the next call. This
takes `start_delay` into account. Returns `None` if
the task is not running.
@ -139,7 +128,7 @@ class ExtendedLoopingCall(LoopingCall):
if self.running and self.interval > 0:
total_runtime = self.clock.seconds() - self.starttime
interval = self.start_delay or self.interval
return interval - (total_runtime % self.interval)
return max(0, interval - (total_runtime % self.interval))
class ScriptBase(ScriptDB, metaclass=TypeclassBase):
@ -147,6 +136,8 @@ class ScriptBase(ScriptDB, metaclass=TypeclassBase):
Base class for scripts. Don't inherit from this, inherit from the
class `DefaultScript` below instead.
This handles the timer-component of the Script.
"""
objects = ScriptManager()
@ -157,36 +148,176 @@ class ScriptBase(ScriptDB, metaclass=TypeclassBase):
def __repr__(self):
return str(self)
def _start_task(self):
def at_idmapper_flush(self):
"""
Start task runner.
If we're flushing this object, make sure the LoopingCall is gone too.
"""
ret = super().at_idmapper_flush()
if ret and self.ndb._task:
self.ndb._pause_task(auto_pause=True)
# TODO - restart anew ?
return ret
def _start_task(self, interval=None, start_delay=None, repeats=None, force_restart=False,
auto_unpause=False, **kwargs):
"""
Start/Unpause task runner, optionally with new values. If given, this will
update the Script's fields.
Keyword Args:
interval (int): How often to tick the task, in seconds. If this is <= 0,
no task will start and properties will not be updated on the Script.
start_delay (int): If the start should be delayed.
repeats (int): How many repeats. 0 for infinite repeats.
force_restart (bool): If set, always create a new task running even if an
old one already was running. Otherwise this will only happen if
new script properties were passed.
auto_unpause (bool): This is an automatic unpaused (used e.g by Evennia after
a reload) and should not un-pause manually paused Script timers.
Note:
If setting the `start-delay` of a *paused* Script, the Script will
restart exactly after that new start-delay, ignoring the time it
was paused at. If only changing the `interval`, the Script will
come out of pause comparing the time it spent in the *old* interval
with the *new* interval in order to determine when next to fire.
Examples:
- Script previously had an interval of 10s and was paused 5s into that interval.
Script is now restarted with a 20s interval. It will next fire after 15s.
- Same Script is restarted with a 3s interval. It will fire immediately.
"""
if self.pk is None:
# script object already deleted from db - don't start a new timer
raise ScriptDB.DoesNotExist
# handle setting/updating fields
update_fields = []
old_interval = self.db_interval
if interval is not None:
self.db_interval = interval
update_fields.append("db_interval")
if start_delay is not None:
self.db_start_delay = start_delay
update_fields.append("db_start_delay")
if repeats is not None:
self.db_repeats = repeats
update_fields.append("db_repeats")
# validate interval
if self.db_interval and self.db_interval > 0:
if not self.is_active:
self.db_is_active = True
update_fields.append("db_is_active")
else:
# no point in starting a task with no interval.
return
restart = bool(update_fields) or force_restart
self.save(update_fields=update_fields)
if self.ndb._task and self.ndb._task.running:
if restart:
# a change needed/forced; stop/remove old task
self._stop_task()
else:
# task alreaady running and no changes needed
return
if not self.ndb._task:
# we should have a fresh task after this point
self.ndb._task = ExtendedLoopingCall(self._step_task)
if self.db._paused_time:
# the script was paused; restarting
callcount = self.db._paused_callcount or 0
self.ndb._task.start(
self.db_interval, now=False, start_delay=self.db._paused_time, count_start=callcount
)
del self.db._paused_time
del self.db._paused_repeats
self._unpause_task(interval=interval, start_delay=start_delay,
auto_unpause=auto_unpause,
old_interval=old_interval)
elif not self.ndb._task.running:
# starting script anew
if not self.ndb._task.running:
# if not unpausing started it, start script anew with the new values
self.ndb._task.start(self.db_interval, now=not self.db_start_delay)
def _stop_task(self):
self.at_start(**kwargs)
def _pause_task(self, auto_pause=False, **kwargs):
"""
Stop task runner
Pause task where it is, saving the current status.
Args:
auto_pause (str):
"""
if not self.db._paused_time:
# only allow pause if not already paused
task = self.ndb._task
if task:
self.db._paused_time = task.next_call_time()
self.db._paused_callcount = task.callcount
self.db._manually_paused = not auto_pause
if task.running:
task.stop()
self.ndb._task = None
self.at_pause(auto_pause=auto_pause, **kwargs)
def _unpause_task(self, interval=None, start_delay=None, auto_unpause=False,
old_interval=0, **kwargs):
"""
Unpause task from paused status. This is used for auto-paused tasks, such
as tasks paused on a server reload.
Args:
interval (int): How often to tick the task, in seconds.
start_delay (int): If the start should be delayed.
auto_unpause (bool): If set, this will only unpause scripts that were unpaused
automatically (useful during a system reload/shutdown).
old_interval (int): The old Script interval (or current one if nothing changed). Used
to recalculate the unpause startup interval.
"""
paused_time = self.db._paused_time
if paused_time:
if auto_unpause and self.db._manually_paused:
# this was manually paused.
return
# task was paused. This will use the new values as needed.
callcount = self.db._paused_callcount or 0
if start_delay is None and interval is not None:
# adjust start-delay based on how far we were into previous interval
start_delay = max(0, interval - (old_interval - paused_time))
else:
start_delay = paused_time
if not self.ndb._task:
self.ndb._task = ExtendedLoopingCall(self._step_task)
self.ndb._task.start(
self.db_interval, now=False, start_delay=start_delay, count_start=callcount
)
del self.db._paused_time
del self.db._paused_callcount
del self.db._manually_paused
self.at_start(**kwargs)
def _stop_task(self, **kwargs):
"""
Stop task runner and delete the task.
"""
task = self.ndb._task
if task and task.running:
task.stop()
self.ndb._task = None
self.db_is_active = False
# make sure this is not confused as a paused script
del self.db._paused_time
del self.db._paused_callcount
del self.db._manually_paused
self.save(update_fields=["db_is_active"])
self.at_stop(**kwargs)
def _step_errback(self, e):
"""
@ -239,12 +370,7 @@ class ScriptBase(ScriptDB, metaclass=TypeclassBase):
logger.log_trace()
return None
def at_script_creation(self):
"""
Should be overridden in child.
"""
pass
# Access methods / hooks
def at_first_save(self, **kwargs):
"""
@ -306,12 +432,196 @@ class ScriptBase(ScriptDB, metaclass=TypeclassBase):
for key, value in cdict["nattributes"]:
self.nattributes.add(key, value)
if not cdict.get("autostart"):
# don't auto-start the script
return
if cdict.get("autostart"):
# autostart the script
self._start_task(force_restart=True)
# auto-start script (default)
self.start()
def delete(self):
"""
Delete the Script. Makes sure to stop any timer tasks first.
"""
self._stop_task()
self.at_script_delete()
super().delete()
def at_script_creation(self):
"""
Should be overridden in child.
"""
pass
def at_script_delete(self):
"""
Called when script is deleted, after at_stop.
"""
pass
def is_valid(self):
"""
If returning False, `at_repeat` will not be called and timer will stop
updating.
"""
return True
def at_repeat(self, **kwargs):
"""
Called repeatedly every `interval` seconds, once `.start()` has
been called on the Script at least once.
Args:
**kwargs (dict): Arbitrary, optional arguments for users
overriding the call (unused by default).
"""
pass
def at_start(self, **kwargs):
pass
def at_pause(self, **kwargs):
pass
def at_stop(self, **kwargs):
pass
def start(self, interval=None, start_delay=None, repeats=None, **kwargs):
"""
Start/Unpause timer component, optionally with new values. If given,
this will update the Script's fields. This will start `at_repeat` being
called every `interval` seconds.
Keyword Args:
interval (int): How often to fire `at_repeat` in seconds.
start_delay (int): If the start of ticking should be delayed.
repeats (int): How many repeats. 0 for infinite repeats.
**kwargs: Optional (default unused) kwargs passed on into the `at_start` hook.
Notes:
If setting the `start-delay` of a *paused* Script, the Script will
restart exactly after that new start-delay, ignoring the time it
was paused at. If only changing the `interval`, the Script will
come out of pause comparing the time it spent in the *old* interval
with the *new* interval in order to determine when next to fire.
Examples:
- Script previously had an interval of 10s and was paused 5s into that interval.
Script is now restarted with a 20s interval. It will next fire after 15s.
- Same Script is restarted with a 3s interval. It will fire immediately.
"""
self._start_task(interval=interval, start_delay=start_delay, repeats=repeats, **kwargs)
def update(self, interval=None, start_delay=None, repeats=None, **kwargs):
"""
Update the Script's timer component with new settings.
Keyword Args:
interval (int): How often to fire `at_repeat` in seconds.
start_delay (int): If the start of ticking should be delayed.
repeats (int): How many repeats. 0 for infinite repeats.
**kwargs: Optional (default unused) kwargs passed on into the `at_start` hook.
"""
self._start_task(interval=interval, start_delay=start_delay,
repeats=repeats, force_restart=True, **kwargs)
def stop(self, **kwargs):
"""
Stop the Script's timer component. This will not delete the Sctipt,
just stop the regular firing of `at_repeat`. Running `.start()` will
start the timer anew, optionally with new settings..
Args:
**kwargs: Optional (default unused) kwargs passed on into the `at_stop` hook.
"""
self._stop_task(**kwargs)
def pause(self, **kwargs):
"""
Manually the Script's timer component manually.
Args:
**kwargs: Optional (default unused) kwargs passed on into the `at_pause` hook.
"""
self._pause_task(manual_pause=True, **kwargs)
def unpause(self, **kwargs):
"""
Manually unpause a Paused Script.
Args:
**kwargs: Optional (default unused) kwargs passed on into the `at_start` hook.
"""
self._unpause_task(**kwargs)
def time_until_next_repeat(self):
"""
Get time until the script fires it `at_repeat` hook again.
Returns:
int or None: Time in seconds until the script runs again.
If not a timed script, return `None`.
Notes:
This hook is not used in any way by the script's stepping
system; it's only here for the user to be able to check in
on their scripts and when they will next be run.
"""
task = self.ndb._task
if task:
try:
return int(round(task.next_call_time()))
except TypeError:
pass
return None
def remaining_repeats(self):
"""
Get the number of returning repeats for limited Scripts.
Returns:
int or None: The number of repeats remaining until the Script
stops. Returns `None` if it has unlimited repeats.
"""
task = self.ndb._task
if task:
return max(0, self.db_repeats - task.callcount)
return None
def reset_callcount(self, value=0):
"""
Reset the count of the number of calls done.
Args:
value (int, optional): The repeat value to reset to. Default
is to set it all the way back to 0.
Notes:
This is only useful if repeats != 0.
"""
task = self.ndb._task
if task:
task.callcount = max(0, int(value))
def force_repeat(self):
"""
Fire a premature triggering of the script callback. This
will reset the timer and count down repeats as if the script
had fired normally.
"""
task = self.ndb._task
if task:
task.force_repeat()
class DefaultScript(ScriptBase):
@ -358,287 +668,20 @@ class DefaultScript(ScriptBase):
"""
pass
def time_until_next_repeat(self):
"""
Get time until the script fires it `at_repeat` hook again.
Returns:
next (int): Time in seconds until the script runs again.
If not a timed script, return `None`.
Notes:
This hook is not used in any way by the script's stepping
system; it's only here for the user to be able to check in
on their scripts and when they will next be run.
"""
task = self.ndb._task
if task:
try:
return int(round(task.next_call_time()))
except TypeError:
pass
return None
def remaining_repeats(self):
"""
Get the number of returning repeats for limited Scripts.
Returns:
remaining (int or `None`): The number of repeats
remaining until the Script stops. Returns `None`
if it has unlimited repeats.
"""
task = self.ndb._task
if task:
return max(0, self.db_repeats - task.callcount)
return None
def at_idmapper_flush(self):
"""If we're flushing this object, make sure the LoopingCall is gone too"""
ret = super(DefaultScript, self).at_idmapper_flush()
if ret and self.ndb._task:
try:
from twisted.internet import reactor
global FLUSHING_INSTANCES
# store the current timers for the _task and stop it to avoid duplicates after cache flush
paused_time = self.ndb._task.next_call_time()
callcount = self.ndb._task.callcount
self._stop_task()
SCRIPT_FLUSH_TIMERS[self.id] = (paused_time, callcount)
# here we ensure that the restart call only happens once, not once per script
if not FLUSHING_INSTANCES:
FLUSHING_INSTANCES = True
reactor.callLater(2, restart_scripts_after_flush)
except Exception:
import traceback
traceback.print_exc()
return ret
def start(self, force_restart=False):
"""
Called every time the script is started (for persistent
scripts, this is usually once every server start)
Args:
force_restart (bool, optional): Normally an already
started script will not be started again. if
`force_restart=True`, the script will always restart
the script, regardless of if it has started before.
Returns:
result (int): 0 or 1 depending on if the script successfully
started or not. Used in counting.
"""
if self.is_active and not force_restart:
# The script is already running, but make sure we have a _task if
# this is after a cache flush
if not self.ndb._task and self.db_interval > 0:
self.ndb._task = ExtendedLoopingCall(self._step_task)
try:
start_delay, callcount = SCRIPT_FLUSH_TIMERS[self.id]
del SCRIPT_FLUSH_TIMERS[self.id]
now = False
except (KeyError, ValueError, TypeError):
now = not self.db_start_delay
start_delay = None
callcount = 0
self.ndb._task.start(
self.db_interval, now=now, start_delay=start_delay, count_start=callcount
)
return 0
obj = self.obj
if obj:
# check so the scripted object is valid and initalized
try:
obj.cmdset
except AttributeError:
# this means the object is not initialized.
logger.log_trace()
self.is_active = False
return 0
# try to restart a paused script
try:
if self.unpause(manual_unpause=False):
return 1
except RuntimeError:
# manually paused.
return 0
# start the script from scratch
self.is_active = True
try:
self.at_start()
except Exception:
logger.log_trace()
if self.db_interval > 0:
self._start_task()
return 1
def stop(self, kill=False):
"""
Called to stop the script from running. This also deletes the
script.
Args:
kill (bool, optional): - Stop the script without
calling any relevant script hooks.
Returns:
result (int): 0 if the script failed to stop, 1 otherwise.
Used in counting.
"""
if not kill:
try:
self.at_stop()
except Exception:
logger.log_trace()
self._stop_task()
try:
self.delete()
except AssertionError:
logger.log_trace()
return 0
except ObjectDoesNotExist:
return 0
return 1
def pause(self, manual_pause=True):
"""
This stops a running script and stores its active state.
It WILL NOT call the `at_stop()` hook.
"""
self.db._manual_pause = manual_pause
if not self.db._paused_time:
# only allow pause if not already paused
task = self.ndb._task
if task:
self.db._paused_time = task.next_call_time()
self.db._paused_callcount = task.callcount
self._stop_task()
self.is_active = False
def unpause(self, manual_unpause=True):
"""
Restart a paused script. This WILL call the `at_start()` hook.
Args:
manual_unpause (bool, optional): This is False if unpause is
called by the server reload/reset mechanism.
Returns:
result (bool): True if unpause was triggered, False otherwise.
Raises:
RuntimeError: If trying to automatically resart this script
(usually after a reset/reload), but it was manually paused,
and so should not the auto-unpaused.
"""
if not manual_unpause and self.db._manual_pause:
# if this script was paused manually (by a direct call of pause),
# it cannot be automatically unpaused (e.g. by a @reload)
raise RuntimeError
# Ensure that the script is fully unpaused, so that future calls
# to unpause do not raise a RuntimeError
self.db._manual_pause = False
if self.db._paused_time:
# only unpause if previously paused
self.is_active = True
try:
self.at_start()
except Exception:
logger.log_trace()
self._start_task()
return True
def restart(self, interval=None, repeats=None, start_delay=None):
"""
Restarts an already existing/running Script from the
beginning, optionally using different settings. This will
first call the stop hooks, and then the start hooks again.
Args:
interval (int, optional): Allows for changing the interval
of the Script. Given in seconds. if `None`, will use the already stored interval.
repeats (int, optional): The number of repeats. If unset, will
use the previous setting.
start_delay (bool, optional): If we should wait `interval` seconds
before starting or not. If `None`, re-use the previous setting.
"""
try:
self.at_stop()
except Exception:
logger.log_trace()
self._stop_task()
self.is_active = False
# remove all pause flags
del self.db._paused_time
del self.db._manual_pause
del self.db._paused_callcount
# set new flags and start over
if interval is not None:
interval = max(0, interval)
self.interval = interval
if repeats is not None:
self.repeats = repeats
if start_delay is not None:
self.start_delay = start_delay
self.start()
def reset_callcount(self, value=0):
"""
Reset the count of the number of calls done.
Args:
value (int, optional): The repeat value to reset to. Default
is to set it all the way back to 0.
Notes:
This is only useful if repeats != 0.
"""
task = self.ndb._task
if task:
task.callcount = max(0, int(value))
def force_repeat(self):
"""
Fire a premature triggering of the script callback. This
will reset the timer and count down repeats as if the script
had fired normally.
"""
task = self.ndb._task
if task:
task.force_repeat()
def is_valid(self):
"""
Is called to check if the script is valid to run at this time.
Should return a boolean. The method is assumed to collect all
needed information from its related self.obj.
Is called to check if the script's timer is valid to run at this time.
Should return a boolean. If False, the timer will be stopped.
"""
return not self._is_deleted
return True
def at_start(self, **kwargs):
"""
Called whenever the script is started, which for persistent
scripts is at least once every server start. It will also be
called when starting again after a pause (such as after a
server reload)
Called whenever the script timer is started, which for persistent
timed scripts is at least once every server start. It will also be
called when starting again after a pause (including after a
server reload).
Args:
**kwargs (dict): Arbitrary, optional arguments for users
@ -658,18 +701,38 @@ class DefaultScript(ScriptBase):
"""
pass
def at_pause(self, manual_pause=True, **kwargs):
"""
Called when this script's timer pauses.
Args:
manual_pause (bool): If set, pausing was done by a direct call. The
non-manual pause indicates the script was paused as part of
the server reload.
"""
pass
def at_stop(self, **kwargs):
"""
Called whenever when it's time for this script to stop (either
because is_valid returned False or it runs out of iterations)
Called whenever when it's time for this script's timer to stop (either
because is_valid returned False, it ran out of iterations or it was manuallys
stopped.
Args
Args:
**kwargs (dict): Arbitrary, optional arguments for users
overriding the call (unused by default).
"""
pass
def at_script_delete(self):
"""
Called when the Script is deleted, after at_stop().
"""
pass
def at_server_reload(self):
"""
This hook is called whenever the server is shutting down for
@ -686,6 +749,15 @@ class DefaultScript(ScriptBase):
"""
pass
def at_server_start(self):
"""
This hook is called after the server has started. It can be used to add
post-startup setup for Scripts without a timer component (for which at_start
could be used).
"""
pass
# Some useful default Script types used by Evennia.

View file

@ -1,10 +1,11 @@
# this is an optimized version only available in later Django versions
from unittest import TestCase
from unittest import TestCase, mock
from parameterized import parameterized
from evennia import DefaultScript
from evennia.scripts.models import ScriptDB, ObjectDoesNotExist
from evennia.utils.create import create_script
from evennia.utils.test_resources import EvenniaTest
from evennia.scripts.scripts import DoNothing
from evennia.scripts.scripts import DoNothing, ExtendedLoopingCall
class TestScript(EvenniaTest):
@ -42,12 +43,45 @@ class TestScriptDB(TestCase):
def test_deleted_script_fails_start(self):
"Would it ever be necessary to start a deleted script?"
self.scr.delete()
with self.assertRaises(ObjectDoesNotExist): # See issue #509
with self.assertRaises(ScriptDB.DoesNotExist): # See issue #509
self.scr.start()
# Check the script is not recreated as a side-effect
self.assertFalse(self.scr in ScriptDB.objects.get_all_scripts())
def test_deleted_script_is_invalid(self):
"Can deleted scripts be said to be valid?"
self.scr.delete()
self.assertFalse(self.scr.is_valid()) # assertRaises? See issue #509
class TestExtendedLoopingCall(TestCase):
"""
Test the ExtendedLoopingCall class.
"""
@mock.patch("evennia.scripts.scripts.LoopingCall")
def test_start__nodelay(self, MockClass):
"""Test the .start method with no delay"""
callback = mock.MagicMock()
loopcall = ExtendedLoopingCall(callback)
loopcall.__call__ = mock.MagicMock()
loopcall._scheduleFrom = mock.MagicMock()
loopcall.clock.seconds = mock.MagicMock(return_value=0)
loopcall.start(20, now=True, start_delay=None, count_start=1)
loopcall._scheduleFrom.assert_not_called()
@mock.patch("evennia.scripts.scripts.LoopingCall")
def test_start__delay(self, MockLoopingCall):
"""Test the .start method with delay"""
callback = mock.MagicMock()
MockLoopingCall.clock.seconds = mock.MagicMock(return_value=0)
loopcall = ExtendedLoopingCall(callback)
loopcall.__call__ = mock.MagicMock()
loopcall.clock.seconds = mock.MagicMock(return_value=121)
loopcall._scheduleFrom = mock.MagicMock()
loopcall.start(20, now=False, start_delay=10, count_start=1)
loopcall.__call__.assert_not_called()
self.assertEqual(loopcall.interval , 20)
loopcall._scheduleFrom.assert_called_with(121)

View file

@ -143,9 +143,6 @@ def _server_maintenance():
if _MAINTENANCE_COUNT % 300 == 0:
# check cache size every 5 minutes
_FLUSH_CACHE(_IDMAPPER_CACHE_MAXSIZE)
if _MAINTENANCE_COUNT % 3600 == 0:
# validate scripts every hour
evennia.ScriptDB.objects.validate()
if _MAINTENANCE_COUNT % 3700 == 0:
# validate channels off-sync with scripts
evennia.CHANNEL_HANDLER.update()
@ -431,9 +428,9 @@ class Evennia:
yield [o.at_server_reload() for o in ObjectDB.get_all_cached_instances()]
yield [p.at_server_reload() for p in AccountDB.get_all_cached_instances()]
yield [
(s.pause(manual_pause=False), s.at_server_reload())
(s._pause_task(auto_pause=True), s.at_server_reload())
for s in ScriptDB.get_all_cached_instances()
if s.id and (s.is_active or s.attributes.has("_manual_pause"))
if s.id and s.is_active
]
yield self.sessions.all_sessions_portal_sync()
self.at_server_reload_stop()
@ -457,11 +454,9 @@ class Evennia:
]
yield ObjectDB.objects.clear_all_sessids()
yield [
(
s.pause(manual_pause=s.attributes.get("_manual_pause", False)),
s.at_server_shutdown(),
)
(s._pause_task(auto_pause=True), s.at_server_shutdown())
for s in ScriptDB.get_all_cached_instances()
if s.id and s.is_active
]
ServerConfig.objects.conf("server_restart_mode", "reset")
self.at_server_cold_stop()
@ -532,9 +527,8 @@ class Evennia:
TICKER_HANDLER.restore(mode == "reload")
# after sync is complete we force-validate all scripts
# (this also starts any that didn't yet start)
ScriptDB.objects.validate(init_mode=mode)
# Un-pause all scripts, stop non-persistent timers
ScriptDB.objects.update_scripts_after_server_start()
# start the task handler
from evennia.scripts.taskhandler import TASK_HANDLER
@ -591,7 +585,7 @@ class Evennia:
from evennia.scripts.models import ScriptDB
for script in ScriptDB.objects.filter(db_persistent=False):
script.stop()
script._stop_task()
if GUEST_ENABLED:
for guest in AccountDB.objects.all().filter(

View file

@ -304,7 +304,7 @@ class ServerSessionHandler(SessionHandler):
sess.load_sync_data(portalsessiondata)
sess.at_sync()
# validate all scripts
_ScriptDB.objects.validate()
# _ScriptDB.objects.validate()
self[sess.sessid] = sess
if sess.logged_in and sess.uid:

View file

@ -71,10 +71,8 @@ class TestServer(TestCase):
) as mocks:
mocks["connection"].close = MagicMock()
mocks["ServerConfig"].objects.conf = MagicMock(return_value=100)
with patch("evennia.server.server.evennia.ScriptDB.objects.validate") as mock:
self.server._server_maintenance()
mocks["_FLUSH_CACHE"].assert_called_with(1000)
mock.assert_called()
self.server._server_maintenance()
mocks["_FLUSH_CACHE"].assert_called_with(1000)
def test__server_maintenance_channel_handler_update(self):
with patch.multiple(

View file

@ -155,12 +155,12 @@ class GlobalScriptContainer(Container):
new_script.start()
return new_script
if (
(found.interval != interval)
if ((found.interval != interval)
or (found.start_delay != start_delay)
or (found.repeats != repeats)
):
found.restart(interval=interval, start_delay=start_delay, repeats=repeats)
# the setup changed
found.start(interval=interval, start_delay=start_delay, repeats=repeats)
if found.desc != desc:
found.desc = desc
return found

View file

@ -31,9 +31,11 @@ class TestCreateScript(EvenniaTest):
self.repeats = 1
self.persistent = False
# script is already stopped (interval=1, start_delay=False)
# script should still exist even though repeats=1, start_delay=False
script = create.create_script(TestScriptB, key="test_script")
assert script is None
assert script
# but the timer should be inactive now
assert not script.is_active
def test_create_script_w_repeats_equal_1_persisted(self):
class TestScriptB1(DefaultScript):
@ -45,7 +47,9 @@ class TestCreateScript(EvenniaTest):
# script is already stopped (interval=1, start_delay=False)
script = create.create_script(TestScriptB1, key="test_script")
assert script is None
assert script
assert not script.is_active
def test_create_script_w_repeats_equal_2(self):
class TestScriptC(DefaultScript):

View file

@ -39,6 +39,10 @@ _MULTIMATCH_TEMPLATE = settings.SEARCH_MULTIMATCH_TEMPLATE
_EVENNIA_DIR = settings.EVENNIA_DIR
_GAME_DIR = settings.GAME_DIR
ENCODINGS = settings.ENCODINGS
_TASK_HANDLER = None
_TICKER_HANDLER = None
_GA = object.__getattribute__
_SA = object.__setattr__
_DA = object.__delattr__
@ -1016,7 +1020,6 @@ def uses_database(name="sqlite3"):
return engine == "django.db.backends.%s" % name
_TASK_HANDLER = None
def delay(timedelay, callback, *args, **kwargs):
@ -1050,12 +1053,81 @@ def delay(timedelay, callback, *args, **kwargs):
"""
global _TASK_HANDLER
# Do some imports here to avoid circular import and speed things up
if _TASK_HANDLER is None:
from evennia.scripts.taskhandler import TASK_HANDLER as _TASK_HANDLER
return _TASK_HANDLER.add(timedelay, callback, *args, **kwargs)
def repeat(interval, callback, persistent=True, idstring="", stop=False,
store_key=None, *args, **kwargs):
"""
Start a repeating task using the TickerHandler.
Args:
interval (int): How often to call callback.
callback (callable): This will be called with `*args, **kwargs` every
`interval` seconds. This must be possible to pickle regardless
of if `persistent` is set or not!
persistent (bool, optional): If ticker survives a server reload.
idstring (str, optional): Separates multiple tickers. This is useful
mainly if wanting to set up multiple repeats for the same
interval/callback but with different args/kwargs.
stop (bool, optional): If set, use the given parameters to _stop_ a running
ticker instead of creating a new one.
store_key (tuple, optional): This is only used in combination with `stop` and
should be the return given from the original `repeat` call. If this
is given, all other args except `stop` are ignored.
*args, **kwargs: Used as arguments to `callback`.
Returns:
tuple or None: This is the `store_key` - the identifier for the created ticker.
Store this and pass into unrepat() in order to to stop this ticker
later. It this lost you need to stop the ticker via TICKER_HANDLER.remove
by supplying all the same arguments
directly. No return if `stop=True`
Raises:
KeyError: If trying to stop a ticker that was not found.
"""
global _TICKER_HANDLER
if _TICKER_HANDLER is None:
from evennia.scripts.tickerhandler import TICKER_HANDLER as _TICKER_HANDLER
if stop:
# we pass all args, but only store_key matters if given
_TICKER_HANDLER.remove(interval=interval,
callback=callback,
idstring=idstring,
persistent=persistent,
store_key=store_key)
else:
return _TICKER_HANDLER.add(interval=interval,
callback=callback,
idstring=idstring,
persistent=persistent)
def unrepeat(store_key):
"""
This is used to stop a ticker previously started with `repeat`.
Args:
store_key (tuple): This is the return from `repeat`, used to uniquely
identify the ticker to stop.
Returns:
bool: True if a ticker was stopped, False if not (for example because no
matching ticker was found or it was already stopped).
"""
try:
repeat(None, None, stop=True, store_key=store_key)
return True
except KeyError:
return False
_PPOOL = None
_PCMD = None
_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9."

View file

@ -8,7 +8,7 @@ pytz
djangorestframework >= 3.10.3, < 3.12
django-filter >= 2.2.0, < 2.3
django-sekizai
inflect
inflect >= 5.2.0
autobahn >= 17.9.3
lunr == 0.5.6
@ -18,8 +18,9 @@ attrs >= 19.2.0
# testing and development
model_mommy
mock >= 1.0.1
anything
black
anything==0.2.1
black
parameterized==0.8.1
# windows-specific
pypiwin32;platform_system=="Windows"