Adds logging of throttle activation and customizable message upon update.

This commit is contained in:
Johnny 2018-10-02 00:05:07 +00:00
parent 2fe3f40a5c
commit 16648d47d1
2 changed files with 16 additions and 3 deletions

View file

@ -436,7 +436,7 @@ class DefaultAccount(with_metaclass(TypeclassBase, AccountDB)):
logger.log_sec('Authentication Failure: %s (IP: %s).' % (username, ip))
# Update throttle
if ip: LOGIN_THROTTLE.update(ip)
if ip: LOGIN_THROTTLE.update(ip, 'Too many authentication failures')
return None, errors

View file

@ -1,4 +1,5 @@
from collections import defaultdict, deque
from evennia.utils import logger
import time
class Throttle(object):
@ -50,22 +51,34 @@ class Throttle(object):
if ip: return self.storage.get(ip, deque(maxlen=self.cache_size))
else: return self.storage
def update(self, ip):
def update(self, ip, failmsg='Exceeded threshold.'):
"""
Store the time of the latest failure/
Store the time of the latest failure.
Args:
ip (str): IP address of requestor
failmsg (str, optional): Message to display in logs upon activation
of throttle.
Returns:
None
"""
# Get current status
previously_throttled = self.check(ip)
# Enforce length limits
if not self.storage[ip].maxlen:
self.storage[ip] = deque(maxlen=self.cache_size)
self.storage[ip].append(time.time())
# See if this update caused a change in status
currently_throttled = self.check(ip)
# If this makes it engage, log a single activation event
if (not previously_throttled and currently_throttled):
logger.log_sec('Throttle Activated: %s (IP: %s, %i hits in %i seconds.)' % (failmsg, ip, self.limit, self.timeout))
def check(self, ip):
"""