Add new cooldowns contrib.

This commit is contained in:
Owllex 2021-11-23 18:03:37 -08:00
parent d890ac10d3
commit 29896a82e8
2 changed files with 300 additions and 8 deletions

View file

@ -0,0 +1,175 @@
"""
Cooldown contrib module.
Evennia contrib - owllex, 2021
This contrib provides a simple cooldown handler that can be attached to any
typeclassed Object or Account. A cooldown is a lightweight persistent
asynchronous timer that you can query to see if it is ready.
Cooldowns are good for modelling rate-limited actions, like how often a
character can perform a given command.
Cooldowns are completely asynchronous and must be queried to know their
state. They do not fire callbacks, so are not a good fit for use cases
where something needs to happen on a specific schedule (use delay or
a TickerHandler for that instead).
Installation:
To use, simply add the following property to the typeclass definition of
any object type that you want to support cooldowns. It will expose a
new `cooldowns` property that persists data to the object's attribute
storage. You can set this on your base `Object` typeclass to enable cooldown
tracking on every kind of object, or just put it on your `Character`
typeclass.
By default the CooldownHandler will use the `cooldowns` property, but you
can customize this if desired by passing a different value for the
db_attribute parameter.
from evennia.contrib.cooldowns import Cooldownhandler
from evennia.utils.utils import lazy_property
@lazy_property
def cooldowns(self):
return CooldownHandler(self, db_attribute="cooldowns")
"""
import math
import time
class CooldownHandler:
"""
Handler for cooldowns. This can be attached to any object that
supports DB attributes (like a Character or Account).
A cooldown is a timer that is usually used to limit how often
some action can be performed or some effect can trigger. When a
cooldown is first set, it counts down from the amount of time
provided back to zero, at which point it is considered ready again.
Cooldowns are named with an arbitrary string, and that string is used
to check on the progression of the cooldown. Each cooldown is tracked
separately and independently.
Cooldowns are saved persistently, so they survive reboots. This
module does not register or provide callback functionality for when
a cooldown becomes ready again. Users of cooldowns are expected to
query the state of any cooldowns they are interested in.
Methods:
- ready(name): Checks whether a given cooldown name is ready.
- time_left(name): Returns how much time is left on a cooldown.
- set(name, seconds): Sets a given cooldown to last for a certain
amount of time. Until then, ready() will return False for that
cooldown name.
- extend(name, seconds): Like set, but adds time to the given
cooldown name. If it doesn't exist yet, calling this is equivalent
to calling set.
- reset(cooldown): Resets a given cooldown, causing ready() to return
True for that cooldown immediately.
- clear(): Resets all cooldowns.
"""
def __init__(self, obj, db_attribute="cooldowns"):
if not obj.attributes.has(db_attribute):
obj.attributes.add(db_attribute, {})
self.data = obj.attributes.get(db_attribute)
self.cleanup()
@property
def all(self):
"""
Returns a list of all keys in this object.
"""
return list(self.data.keys())
def ready(self, *args):
"""
Checks whether all of the provided cooldowns are ready (expired).
If a requested cooldown does not exist, it is considered ready.
Args:
any (str): One or more cooldown names to check.
Returns:
(bool): True if each cooldown has expired or does not exist.
"""
return self.time_left(*args) <= 0
def time_left(self, *args):
"""
Returns the maximum amount of time left on one or more given
cooldowns. If a requested cooldown does not exist, it is
considered to have 0 time left.
Args:
any (str): One or more cooldown names to check.
Returns:
(int): Number of seconds until all provided cooldowns are
ready. Returns 0 if all cooldowns are ready (or don't
exist.)
"""
now = time.time()
cooldowns = [self.data[x] - now for x in args if x in self.data]
if not cooldowns:
return 0
return math.ceil(max(max(cooldowns), 0))
def set(self, cooldown, seconds):
"""
Sets a given cooldown to last for a specific amount of time.
If this cooldown is already set, this replaces it.
Args:
cooldown (str): The name of the cooldown.
seconds (int): The number of seconds before this cooldown is
ready again.
"""
now = time.time()
self.data[cooldown] = int(now) + (max(seconds, 0) if seconds else 0)
def extend(self, cooldown, seconds):
"""
Adds a specific amount of time to an existing cooldown.
If this cooldown is already ready, this is equivalent to calling
set. If the cooldown is not ready, it will be extended by the
provided duration.
Args:
cooldown (str): The name of the cooldown.
seconds (int): The number of seconds to extend this cooldown.
"""
time_left = self.time_left(cooldown)
self.set(cooldown, time_left + (seconds if seconds else 0))
def reset(self, cooldown):
"""
Resets a given cooldown.
Args:
cooldown (str): The name of the cooldown.
"""
if cooldown in self.data:
del self.data[cooldown]
def clear(self):
"""
Resets all cooldowns.
"""
for cooldown in list(self.data.keys()):
del self.data[cooldown]
def cleanup(self):
"""
Deletes all expired cooldowns. This helps keep attribute storage
requirements small.
"""
now = time.time()
keys = [x for x in self.data.keys() if self.data[x] - now < 0]
for key in keys:
del self.data[key]

View file

@ -106,6 +106,7 @@ recog10 = "Mr Sender"
emote = 'With a flair, /me looks at /first and /colliding sdesc-guy. She says "This is a test."'
case_emote = "/me looks at /first, then /FIRST, /First and /Colliding twice."
class TestRPSystem(EvenniaTest):
maxDiff = None
@ -195,11 +196,15 @@ class TestRPSystem(EvenniaTest):
"#9": "A nice sender of emotes",
},
)
self.assertEqual(rpsystem.parse_sdescs_and_recogs(
speaker, candidates, emote, case_sensitive=False), result)
self.assertEqual(
rpsystem.parse_sdescs_and_recogs(speaker, candidates, emote, case_sensitive=False),
result,
)
self.speaker.recog.add(self.receiver1, recog01)
self.assertEqual(rpsystem.parse_sdescs_and_recogs(
speaker, candidates, emote, case_sensitive=False), result)
self.assertEqual(
rpsystem.parse_sdescs_and_recogs(speaker, candidates, emote, case_sensitive=False),
result,
)
def test_send_emote(self):
speaker = self.speaker
@ -246,18 +251,18 @@ class TestRPSystem(EvenniaTest):
self.out0,
"|bSender|n looks at |bthe first receiver of emotes.|n, then "
"|bTHE FIRST RECEIVER OF EMOTES.|n, |bThe first receiver of emotes.|n and "
"|bAnother nice colliding sdesc-guy for tests|n twice."
"|bAnother nice colliding sdesc-guy for tests|n twice.",
)
self.assertEqual(
self.out1,
"|bA nice sender of emotes|n looks at |bReceiver1|n, then |bReceiver1|n, "
"|bReceiver1|n and |bAnother nice colliding sdesc-guy for tests|n twice."
"|bReceiver1|n and |bAnother nice colliding sdesc-guy for tests|n twice.",
)
self.assertEqual(
self.out2,
"|bA nice sender of emotes|n looks at |bthe first receiver of emotes.|n, "
"then |bTHE FIRST RECEIVER OF EMOTES.|n, |bThe first receiver of "
"emotes.|n and |bReceiver2|n twice."
"emotes.|n and |bReceiver2|n twice.",
)
def test_rpsearch(self):
@ -277,7 +282,7 @@ class TestRPSystem(EvenniaTest):
result = rpsystem.regex_tuple_from_key_alias(self.speaker)
t2 = time.time()
# print(f"t1: {t1 - t0}, t2: {t2 - t1}")
self.assertLess(t2-t1, 10**-4)
self.assertLess(t2 - t1, 10 ** -4)
self.assertEqual(result, (Anything, self.speaker, self.speaker.key))
@ -3359,10 +3364,12 @@ class TestBuildingMenu(CommandTest):
from evennia.contrib import mux_comms_cmds as comms # noqa
class TestLegacyMuxComms(CommandTest):
"""
Test the legacy comms contrib.
"""
def setUp(self):
super(CommandTest, self).setUp()
self.call(
@ -3435,3 +3442,113 @@ class TestLegacyMuxComms(CommandTest):
"|Channel 'testchan' was destroyed.",
receiver=self.account,
)
from evennia.contrib import cooldowns
@patch("evennia.contrib.cooldowns.time.time", return_value=0)
class TestCooldowns(EvenniaTest):
def setUp(self):
super().setUp()
self.handler = cooldowns.CooldownHandler(self.char1)
def test_empty(self, mock_time):
self.assertEqual(self.handler.all, [])
self.assertTrue(self.handler.ready("a", "b", "c"))
self.assertEqual(self.handler.time_left("a", "b", "c"), 0)
def test_set(self, mock_time):
self.handler.set("a", 10)
self.assertFalse(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 10)
mock_time.return_value = 9
self.assertFalse(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 1)
mock_time.return_value = 10
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
def test_set_multi(self, mock_time):
self.handler.set("a", 10)
self.handler.set("b", 5)
self.handler.set("c", 3)
self.assertFalse(self.handler.ready("a", "b", "c"))
self.assertEqual(self.handler.time_left("a", "b", "c"), 10)
self.assertEqual(self.handler.time_left("a", "b"), 10)
self.assertEqual(self.handler.time_left("a", "c"), 10)
self.assertEqual(self.handler.time_left("b", "c"), 5)
self.assertEqual(self.handler.time_left("c", "c"), 3)
def test_set_none(self, mock_time):
self.handler.set("a", None)
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
def test_set_negative(self, mock_time):
self.handler.set("a", -5)
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
def test_set_overwrite(self, mock_time):
self.handler.set("a", 5)
self.handler.set("a", 10)
self.handler.set("a", 3)
self.assertFalse(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 3)
def test_extend(self, mock_time):
self.handler.extend("a", 10)
self.assertFalse(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 10)
self.handler.extend("a", 10)
self.assertFalse(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 20)
def test_extend_none(self, mock_time):
self.handler.extend("a", None)
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
self.handler.set("a", 10)
self.handler.extend("a", None)
self.assertEqual(self.handler.time_left("a"), 10)
def test_extend_negative(self, mock_time):
self.handler.extend("a", -5)
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
self.handler.set("a", 10)
self.handler.extend("a", -5)
self.assertEqual(self.handler.time_left("a"), 5)
def test_reset_non_existent(self, mock_time):
self.handler.reset("a")
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
def test_reset(self, mock_time):
self.handler.set("a", 10)
self.handler.reset("a")
self.assertTrue(self.handler.ready("a"))
self.assertEqual(self.handler.time_left("a"), 0)
def test_clear(self, mock_time):
self.handler.set("a", 10)
self.handler.set("b", 10)
self.handler.set("c", 10)
self.handler.clear()
self.assertTrue(self.handler.ready("a", "b", "c"))
self.assertEqual(self.handler.time_left("a", "b", "c"), 0)
def test_cleanup(self, mock_time):
self.handler.set("a", 10)
self.handler.set("b", 5)
self.handler.set("c", 5)
mock_time.return_value = 6
self.handler.cleanup()
self.assertEqual(self.handler.time_left("b", "c"), 0)
self.assertEqual(self.handler.time_left("a"), 4)
self.assertNotIn("b", self.handler.data)
self.assertNotIn("c", self.handler.data)