mirror of
https://github.com/evennia/evennia.git
synced 2026-03-16 12:56:30 +01:00
Make AUDIT_LOGS customizable from settings. Resolve #2627
This commit is contained in:
parent
67bac94aa9
commit
1d43bd8bbb
9 changed files with 191 additions and 16 deletions
|
|
@ -9,6 +9,8 @@
|
|||
- [Feat][pull3599]: Make `at_pre_cmd` testable in unit tests (blongden)
|
||||
- [Fix]: API /openapi/setattribute endpoints were both POST and PUT, causing schema
|
||||
errors; now changed to PUT only. (Griatch)
|
||||
- [Feat][issue2627]: Add `settings.AUDIT_MASKS` to customize what Evennia should
|
||||
obfuscate in server error logs (such as passwords from custom login commands) (Griatch)
|
||||
- [Fix][pull3799]: Fix typo in `basic_tc.py` contrib for beginner tutorial (Tharic99)
|
||||
- [Fix][pull3806]: EvMore wouldn't pass Session to next cmd when exiting (gas-public-wooden-clean)
|
||||
- [Fix][pull3809]: Admin page - Repair link to Account button (UserlandAlchemist)
|
||||
|
|
|
|||
|
|
@ -651,7 +651,7 @@ def cmdhandler(
|
|||
pass
|
||||
except Exception:
|
||||
_msg_err(caller, _ERROR_UNTRAPPED)
|
||||
raise ErrorReported(raw_string)
|
||||
raise ErrorReported(cmd.raw_string)
|
||||
finally:
|
||||
_COMMAND_NESTING[called_by] -= 1
|
||||
|
||||
|
|
@ -761,7 +761,7 @@ def cmdhandler(
|
|||
except ErrorReported as exc:
|
||||
# this error was already reported, so we
|
||||
# catch it here and don't pass it on.
|
||||
logger.log_err("User input was: '%s'." % exc.raw_string)
|
||||
logger.log_err("User input was: '%s'." % logger.mask_sensitive_input(exc.raw_string))
|
||||
|
||||
except ExecSystemCommand as exc:
|
||||
# Not a normal command: run a system command, if available,
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import re
|
|||
|
||||
from django.conf import settings
|
||||
|
||||
from evennia.utils.logger import log_trace
|
||||
from evennia.utils.logger import log_trace, mask_sensitive_input
|
||||
|
||||
_MULTIMATCH_REGEX = re.compile(settings.SEARCH_MULTIMATCH_REGEX, re.I + re.U)
|
||||
_CMD_IGNORE_PREFIXES = settings.CMD_IGNORE_PREFIXES
|
||||
|
|
@ -71,7 +71,7 @@ def build_matches(raw_string, cmdset, include_prefixes=False):
|
|||
if cmdname:
|
||||
matches.append(create_match(cmdname, raw_string, cmd, raw_cmdname))
|
||||
except Exception:
|
||||
log_trace("cmdhandler error. raw_input:%s" % raw_string)
|
||||
log_trace("cmdhandler error. raw_input:%s" % mask_sensitive_input(raw_string))
|
||||
return matches
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ Unit testing for the Command system itself.
|
|||
|
||||
"""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.test import override_settings
|
||||
|
||||
from evennia.commands import cmdparser
|
||||
|
|
@ -1191,6 +1193,19 @@ class TestCmdParser(TestCase):
|
|||
("look at", " target", dummy, 7, 0.5, "look"),
|
||||
)
|
||||
|
||||
@patch("evennia.commands.cmdparser.log_trace")
|
||||
def test_build_matches_masks_sensitive_input_on_error(self, mock_log_trace):
|
||||
class _BrokenCmdSet:
|
||||
def __iter__(self):
|
||||
raise RuntimeError("forced parser failure")
|
||||
|
||||
cmdparser.build_matches("connect johnny password123", _BrokenCmdSet())
|
||||
self.assertTrue(mock_log_trace.called)
|
||||
|
||||
logged = mock_log_trace.call_args[0][0]
|
||||
self.assertIn("connect johnny ***********", logged)
|
||||
self.assertNotIn("password123", logged)
|
||||
|
||||
@override_settings(CMD_IGNORE_PREFIXES="@&/+")
|
||||
def test_build_matches(self):
|
||||
a_cmdset = _CmdSetTest()
|
||||
|
|
@ -1375,3 +1390,38 @@ class TestIssue3643(BaseEvenniaTest):
|
|||
def test_issue_3643(self):
|
||||
cmd = _TestCmd1()
|
||||
self.assertEqual(cmd.locks, "cmd:all();usecmd:false()")
|
||||
|
||||
|
||||
class _CmdCrash(Command):
|
||||
key = "connect"
|
||||
|
||||
def func(self):
|
||||
raise RuntimeError("forced failure")
|
||||
|
||||
|
||||
class TestIssue2627(TwistedTestCase, BaseEvenniaTest):
|
||||
"""
|
||||
Prevent logging plaintext credentials in command-error reporting.
|
||||
https://github.com/evennia/evennia/issues/2627
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.patch(sys.modules["evennia.server.sessionhandler"], "delay", _mockdelay)
|
||||
super().setUp()
|
||||
|
||||
@patch("evennia.commands.cmdhandler.logger.log_err")
|
||||
def test_cmdhandler_masks_sensitive_input_in_error_log(self, mock_log_err):
|
||||
d = cmdhandler.cmdhandler(
|
||||
self.session, " johnny password123", cmdobj=_CmdCrash(), cmdobj_key="connect"
|
||||
)
|
||||
|
||||
def _callback(_):
|
||||
logged = [call.args[0] for call in mock_log_err.call_args_list if call.args]
|
||||
self.assertIn("User input was: 'connect johnny ***********'.", logged)
|
||||
self.assertNotIn(
|
||||
"User input was: 'connect johnny password123'.",
|
||||
logged,
|
||||
)
|
||||
|
||||
d.addCallback(_callback)
|
||||
return d
|
||||
|
|
|
|||
|
|
@ -23,16 +23,7 @@ AUDIT_CALLBACK = getattr(
|
|||
AUDIT_IN = getattr(ev_settings, "AUDIT_IN", False)
|
||||
AUDIT_OUT = getattr(ev_settings, "AUDIT_OUT", False)
|
||||
AUDIT_ALLOW_SPARSE = getattr(ev_settings, "AUDIT_ALLOW_SPARSE", False)
|
||||
AUDIT_MASKS = [
|
||||
{"connect": r"^[@\s]*[connect]{5,8}\s+(\".+?\"|[^\s]+)\s+(?P<secret>.+)"},
|
||||
{"connect": r"^[@\s]*[connect]{5,8}\s+(?P<secret>[\w]+)"},
|
||||
{"create": r"^[^@]?[create]{5,6}\s+(\w+|\".+?\")\s+(?P<secret>[\w]+)"},
|
||||
{"create": r"^[^@]?[create]{5,6}\s+(?P<secret>[\w]+)"},
|
||||
{"userpassword": r"^[@\s]*[userpassword]{11,14}\s+(\w+|\".+?\")\s+=*\s*(?P<secret>[\w]+)"},
|
||||
{"userpassword": r"^.*new password set to '(?P<secret>[^']+)'\."},
|
||||
{"userpassword": r"^.* has changed your password to '(?P<secret>[^']+)'\."},
|
||||
{"password": r"^[@\s]*[password]{6,9}\s+(?P<secret>.*)"},
|
||||
] + getattr(ev_settings, "AUDIT_MASKS", [])
|
||||
AUDIT_MASKS = getattr(ev_settings, "AUDIT_MASKS", [])
|
||||
|
||||
|
||||
if AUDIT_CALLBACK:
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ Module containing the test cases for the Audit system.
|
|||
import re
|
||||
|
||||
from anything import Anything
|
||||
from django.test import override_settings
|
||||
from mock import patch
|
||||
|
||||
import evennia
|
||||
|
|
@ -15,7 +14,6 @@ from evennia.utils.test_resources import BaseEvenniaTest
|
|||
from .server import AuditedServerSession
|
||||
|
||||
|
||||
@override_settings(AUDIT_MASKS=[])
|
||||
class AuditingTest(BaseEvenniaTest):
|
||||
@patch("evennia.server.sessionhandler._ServerSession", AuditedServerSession)
|
||||
def setup_session(self):
|
||||
|
|
|
|||
|
|
@ -272,6 +272,25 @@ MAX_CHAR_LIMIT_WARNING = (
|
|||
# debugging. OBS: Showing full tracebacks to regular users could be a
|
||||
# security problem -turn this off in a production game!
|
||||
IN_GAME_ERRORS = True
|
||||
# Default masking regexes used by security/audit logging to avoid writing
|
||||
# cleartext credentials to logs. Each entry is a dict mapping an arbitrary
|
||||
# label to a regex with a named group `(?P<secret>...)` indicating what to mask.
|
||||
# You can override this list in your settings.py, or append to it to support
|
||||
# custom login/password commands:
|
||||
# AUDIT_MASKS += [{"mycmd": r"^mycmd\\s+(?P<secret>.+)$"}]
|
||||
AUDIT_MASKS = [
|
||||
{"connect": r'^\s*(?:connect|conn|con|co)\s+("[^"]+"|[^\s]+)\s+(?P<secret>.+)$'},
|
||||
{"create": r'^\s*(?:create|cre|cr)\s+("[^"]+"|[^\s]+)\s+(?P<secret>.+)$'},
|
||||
{"userpassword": r'^[@\s]*userpassword\s+(\w+|".+?")\s+=*\s*(?P<secret>[\w]+)$'},
|
||||
{"userpassword": r"^.*new password set to '(?P<secret>[^']+)'\."},
|
||||
{"userpassword": r"^.* has changed your password to '(?P<secret>[^']+)'\."},
|
||||
{"password": r"^[@\s]*(?:password|passwd)\s+(?P<secret>.*)$"},
|
||||
# Legacy typo-tolerant variants (kept for backwards compatibility with auditing behavior).
|
||||
{"connect": r'^[@\s]*[connect]{5,8}\s+(".+?"|[^\s]+)\s+(?P<secret>.+)$'},
|
||||
{"connect": r"^[@\s]*[connect]{5,8}\s+(?P<secret>[\w]+)$"},
|
||||
{"create": r'^[^@]?[create]{5,6}\s+(\w+|".+?")\s+(?P<secret>[\w]+)$'},
|
||||
{"create": r"^[^@]?[create]{5,6}\s+(?P<secret>[\w]+)$"},
|
||||
]
|
||||
# Broadcast "Server restart"-like messages to all sessions.
|
||||
BROADCAST_SERVER_RESTART_MESSAGES = True
|
||||
|
||||
|
|
|
|||
|
|
@ -14,10 +14,12 @@ log_typemsg(). This is for historical, back-compatible reasons.
|
|||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime
|
||||
from traceback import format_exc
|
||||
|
||||
from django.conf import settings
|
||||
from twisted import logger as twisted_logger
|
||||
from twisted.internet.threads import deferToThread
|
||||
from twisted.python import logfile
|
||||
|
|
@ -33,6 +35,71 @@ _CHANNEL_LOG_NUM_TAIL_LINES = None
|
|||
_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
|
||||
_SENSITIVE_INPUT_PATTERNS = None
|
||||
_SENSITIVE_INPUT_PATTERNS_SIGNATURE = None
|
||||
|
||||
|
||||
def _compile_sensitive_input_patterns():
|
||||
"""
|
||||
Compile masking regexes from settings.AUDIT_MASKS.
|
||||
"""
|
||||
patterns = []
|
||||
masks = getattr(settings, "AUDIT_MASKS", [])
|
||||
signature = repr(masks)
|
||||
|
||||
for mask in masks:
|
||||
# auditing format is list[dict[label -> regex]], but we also
|
||||
# support plain regex strings as a convenience.
|
||||
if isinstance(mask, dict):
|
||||
regexes = mask.values()
|
||||
else:
|
||||
regexes = [mask]
|
||||
for regex in regexes:
|
||||
try:
|
||||
patterns.append(re.compile(regex, re.IGNORECASE))
|
||||
except re.error as err:
|
||||
log_err(f"Invalid AUDIT_MASKS regex '{regex}': {err}")
|
||||
|
||||
return patterns, signature
|
||||
|
||||
|
||||
def mask_sensitive_input(msg):
|
||||
"""
|
||||
Mask sensitive command arguments in a raw command string.
|
||||
|
||||
Args:
|
||||
msg (str): Raw command line.
|
||||
|
||||
Returns:
|
||||
str: Same string, with sensitive segments replaced by ``*``.
|
||||
"""
|
||||
global _SENSITIVE_INPUT_PATTERNS, _SENSITIVE_INPUT_PATTERNS_SIGNATURE
|
||||
if not msg:
|
||||
return msg
|
||||
|
||||
if _SENSITIVE_INPUT_PATTERNS is None:
|
||||
_SENSITIVE_INPUT_PATTERNS, _SENSITIVE_INPUT_PATTERNS_SIGNATURE = (
|
||||
_compile_sensitive_input_patterns()
|
||||
)
|
||||
else:
|
||||
current_signature = repr(getattr(settings, "AUDIT_MASKS", []))
|
||||
if current_signature != _SENSITIVE_INPUT_PATTERNS_SIGNATURE:
|
||||
_SENSITIVE_INPUT_PATTERNS, _SENSITIVE_INPUT_PATTERNS_SIGNATURE = (
|
||||
_compile_sensitive_input_patterns()
|
||||
)
|
||||
|
||||
for pattern in _SENSITIVE_INPUT_PATTERNS:
|
||||
match = pattern.match(msg)
|
||||
if match:
|
||||
if "secret" not in pattern.groupindex:
|
||||
continue
|
||||
start, end = match.span("secret")
|
||||
# Keep rough size information while still obscuring short secrets.
|
||||
masked = "*" * max(8, end - start)
|
||||
return f"{msg[:start]}{masked}{msg[end:]}"
|
||||
return msg
|
||||
|
||||
|
||||
def _log(msg, logfunc, prefix="", **kwargs):
|
||||
try:
|
||||
msg = str(msg)
|
||||
|
|
|
|||
48
evennia/utils/tests/test_logger.py
Normal file
48
evennia/utils/tests/test_logger.py
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
import unittest
|
||||
|
||||
from django.test import override_settings
|
||||
|
||||
from evennia.utils.logger import mask_sensitive_input
|
||||
|
||||
|
||||
class TestMaskSensitiveInput(unittest.TestCase):
|
||||
def test_connect(self):
|
||||
self.assertEqual(mask_sensitive_input("connect johnny password123"), "connect johnny ***********")
|
||||
self.assertEqual(
|
||||
mask_sensitive_input('connect "johnny five" "password 123"'),
|
||||
'connect "johnny five" **************',
|
||||
)
|
||||
self.assertEqual(mask_sensitive_input("conn johnny pass"), "conn johnny ********")
|
||||
|
||||
def test_create(self):
|
||||
self.assertEqual(mask_sensitive_input("create johnny password123"), "create johnny ***********")
|
||||
self.assertEqual(mask_sensitive_input("cr johnny pass"), "cr johnny ********")
|
||||
|
||||
def test_password(self):
|
||||
self.assertEqual(
|
||||
mask_sensitive_input("@password oldpassword = newpassword"),
|
||||
"@password *************************",
|
||||
)
|
||||
self.assertEqual(
|
||||
mask_sensitive_input("password oldpassword newpassword"),
|
||||
"password ***********************",
|
||||
)
|
||||
|
||||
def test_userpassword(self):
|
||||
self.assertEqual(
|
||||
mask_sensitive_input("@userpassword johnny = password234"),
|
||||
"@userpassword johnny = ***********",
|
||||
)
|
||||
|
||||
def test_non_sensitive(self):
|
||||
safe = "say connect johnny password123"
|
||||
self.assertEqual(mask_sensitive_input(safe), safe)
|
||||
|
||||
@override_settings(AUDIT_MASKS=[{"mylogin": r"^mylogin\s+\w+\s+(?P<secret>.+)$"}])
|
||||
def test_override_settings_masks(self):
|
||||
self.assertEqual(mask_sensitive_input("mylogin johnny customsecret"), "mylogin johnny ************")
|
||||
# default masks are replaced when overridden.
|
||||
self.assertEqual(
|
||||
mask_sensitive_input("connect johnny password123"),
|
||||
"connect johnny password123",
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue