diff --git a/evennia/server/portal/webclient_ajax.py b/evennia/server/portal/webclient_ajax.py index d6dc3c9fbf..df900d123e 100644 --- a/evennia/server/portal/webclient_ajax.py +++ b/evennia/server/portal/webclient_ajax.py @@ -31,7 +31,7 @@ from evennia.server import session from evennia.utils import utils from evennia.utils.ansi import parse_ansi from evennia.utils.text2html import parse_html -from evennia.utils.utils import to_bytes +from evennia.utils.utils import to_bytes, ip_from_request _CLIENT_SESSIONS = utils.mod_import(settings.SESSION_ENGINE).SessionStore _RE_SCREENREADER_REGEX = re.compile( @@ -197,16 +197,7 @@ class AjaxWebClient(resource.Resource): csessid = self.get_client_sessid(request) browserstr = self.get_browserstr(request) - remote_addr = request.getClientIP() - - if remote_addr in settings.UPSTREAM_IPS and request.getHeader("x-forwarded-for"): - addresses = [x.strip() for x in request.getHeader("x-forwarded-for").split(",")] - addresses.reverse() - - for addr in addresses: - if addr not in settings.UPSTREAM_IPS: - remote_addr = addr - break + remote_addr = ip_from_request(request) host_string = "%s (%s:%s)" % ( _SERVERNAME, diff --git a/evennia/settings_default.py b/evennia/settings_default.py index 83038d8358..297da47a2b 100644 --- a/evennia/settings_default.py +++ b/evennia/settings_default.py @@ -90,7 +90,8 @@ WEBSERVER_PORTS = [(4001, 4005)] # Interface addresses to listen to. If 0.0.0.0, listen to all. Use :: for IPv6. WEBSERVER_INTERFACES = ["0.0.0.0"] # IP addresses that may talk to the server in a reverse proxy configuration, -# like NginX. +# like NginX or Varnish. These can be either specific IPv4 or IPv6 addresses, +# or subnets in CIDR format - like 192.168.0.0/24 or 2001:db8::/32. UPSTREAM_IPS = ["127.0.0.1"] # The webserver uses threadpool for handling requests. This will scale # with server load. Set the minimum and maximum number of threads it @@ -1031,6 +1032,7 @@ MIDDLEWARE = [ "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.admindocs.middleware.XViewMiddleware", "django.contrib.flatpages.middleware.FlatpageFallbackMiddleware", + "evennia.web.utils.middleware.OriginIpMiddleware", "evennia.web.utils.middleware.SharedLoginMiddleware", ] diff --git a/evennia/utils/tests/test_utils.py b/evennia/utils/tests/test_utils.py index 8a76a695a4..9c5d4827f0 100644 --- a/evennia/utils/tests/test_utils.py +++ b/evennia/utils/tests/test_utils.py @@ -773,3 +773,15 @@ class TestJustify(TestCase): result = utils.justify(line, align="c", width=30) self.assertIn(ANSI_RED, str(result)) + + +class TestMatchIP(TestCase): + """ + test utils.match_ip + """ + + def test_match_ip(self): + self.assertFalse(utils.match_ip("192.168.0.1", "10.0.0.0/24")) + self.assertTrue(utils.match_ip("192.168.0.1", "192.168.0.0/24")) + self.assertTrue(utils.match_ip("192.168.0.1", "192.168.0.1")) + self.assertFalse(utils.match_ip("192.168.0.1", "10.0.0.1")) diff --git a/evennia/utils/utils.py b/evennia/utils/utils.py index f44e78c372..afbc6f382f 100644 --- a/evennia/utils/utils.py +++ b/evennia/utils/utils.py @@ -11,6 +11,7 @@ import importlib import importlib.machinery import importlib.util import inspect +import ipaddress import math import os import random @@ -2942,3 +2943,78 @@ def str2int(number): # invalid number-word, raise ValueError raise ValueError(f"String {original_input} cannot be converted to int.") return sum(sums) + + +def match_ip(address, pattern) -> bool: + """ + Check if an IP address matches a given pattern. The pattern can be a single IP address + such as 8.8.8.8 or a CIDR-formatted subnet like 10.0.0.0/8 + + IPv6 is supported to, with CIDR-subnets looking like 2001:db8::/48 + + Args: + address (str): The source address being checked. + pattern (str): The single IP address or subnet to check against. + + Returns: + result (bool): Whether it was a match or not. + """ + try: + # Convert the given IP address to an IPv4Address or IPv6Address object + ip_obj = ipaddress.ip_address(address) + except ValueError: + # Invalid IP address format + return False + + try: + # Check if pattern is a single IP or a subnet + if "/" in pattern: + # It's (hopefully) a subnet in CIDR notation + network = ipaddress.ip_network(pattern, strict=False) + if ip_obj in network: + return True + else: + # It's a single IP address + if ip_obj == ipaddress.ip_address(pattern): + return True + except ValueError: + return False + return False + + +def ip_from_request(request, exclude=None) -> str: + """ + Retrieves the IP address from a web Request, while respecting X-Forwarded-For and + settings.UPSTREAM_IPS. + + Args: + request (django Request or twisted.web.http.Request): The web request. + exclude: (list, optional): A list of IP addresses to exclude from the check. If left none, + then settings.UPSTREAM_IPS will be used. + + Returns: + ip (str): The IP address the request originated from. + """ + if exclude is None: + exclude = settings.UPSTREAM_IPS + + if hasattr(request, "getClientIP"): + # It's a twisted request. + remote_addr = request.getClientIP() + forwarded = request.getHeader("x-forwarded-for") + else: + # it's a Django request. + remote_addr = request.META.get("REMOTE_ADDR") + forwarded = request.META.get("HTTP_X_FORWARDED_FOR") + + addresses = [remote_addr] + + if forwarded: + addresses.extend(x.strip() for x in forwarded.split(",")) + + for addr in reversed(addresses): + if all(not match_ip(addr, pattern) for pattern in exclude): + return addr + + logger.log_warn("ip_from_request: No valid IP address found in request. Using remote_addr.") + return remote_addr diff --git a/evennia/web/utils/middleware.py b/evennia/web/utils/middleware.py index 437e07c274..b617a5eb31 100644 --- a/evennia/web/utils/middleware.py +++ b/evennia/web/utils/middleware.py @@ -1,7 +1,21 @@ from django.contrib.auth import authenticate, login from evennia.accounts.models import AccountDB -from evennia.utils import logger +from evennia.utils import logger, ip_from_request + + +class OriginIpMiddleware: + """ + This Django Middleware simply sets the request.origin_ip attribute to what is + respected by the Evennia Server, taking into account settings.UPSTREAM_IPS. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + request.origin_ip = ip_from_request(request) + return self.get_response(request) class SharedLoginMiddleware(object):