mirror of
https://github.com/evennia/evennia.git
synced 2026-03-16 21:06:30 +01:00
Merge pull request #3344 from volundmush/ip_from_request
Overhaul IP Matching
This commit is contained in:
commit
d26a88dff3
5 changed files with 108 additions and 13 deletions
|
|
@ -31,7 +31,7 @@ from evennia.server import session
|
|||
from evennia.utils import utils
|
||||
from evennia.utils.ansi import parse_ansi
|
||||
from evennia.utils.text2html import parse_html
|
||||
from evennia.utils.utils import to_bytes
|
||||
from evennia.utils.utils import to_bytes, ip_from_request
|
||||
|
||||
_CLIENT_SESSIONS = utils.mod_import(settings.SESSION_ENGINE).SessionStore
|
||||
_RE_SCREENREADER_REGEX = re.compile(
|
||||
|
|
@ -197,16 +197,7 @@ class AjaxWebClient(resource.Resource):
|
|||
csessid = self.get_client_sessid(request)
|
||||
browserstr = self.get_browserstr(request)
|
||||
|
||||
remote_addr = request.getClientIP()
|
||||
|
||||
if remote_addr in settings.UPSTREAM_IPS and request.getHeader("x-forwarded-for"):
|
||||
addresses = [x.strip() for x in request.getHeader("x-forwarded-for").split(",")]
|
||||
addresses.reverse()
|
||||
|
||||
for addr in addresses:
|
||||
if addr not in settings.UPSTREAM_IPS:
|
||||
remote_addr = addr
|
||||
break
|
||||
remote_addr = ip_from_request(request)
|
||||
|
||||
host_string = "%s (%s:%s)" % (
|
||||
_SERVERNAME,
|
||||
|
|
|
|||
|
|
@ -90,7 +90,8 @@ WEBSERVER_PORTS = [(4001, 4005)]
|
|||
# Interface addresses to listen to. If 0.0.0.0, listen to all. Use :: for IPv6.
|
||||
WEBSERVER_INTERFACES = ["0.0.0.0"]
|
||||
# IP addresses that may talk to the server in a reverse proxy configuration,
|
||||
# like NginX.
|
||||
# like NginX or Varnish. These can be either specific IPv4 or IPv6 addresses,
|
||||
# or subnets in CIDR format - like 192.168.0.0/24 or 2001:db8::/32.
|
||||
UPSTREAM_IPS = ["127.0.0.1"]
|
||||
# The webserver uses threadpool for handling requests. This will scale
|
||||
# with server load. Set the minimum and maximum number of threads it
|
||||
|
|
@ -1031,6 +1032,7 @@ MIDDLEWARE = [
|
|||
"django.middleware.csrf.CsrfViewMiddleware",
|
||||
"django.contrib.admindocs.middleware.XViewMiddleware",
|
||||
"django.contrib.flatpages.middleware.FlatpageFallbackMiddleware",
|
||||
"evennia.web.utils.middleware.OriginIpMiddleware",
|
||||
"evennia.web.utils.middleware.SharedLoginMiddleware",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -773,3 +773,15 @@ class TestJustify(TestCase):
|
|||
result = utils.justify(line, align="c", width=30)
|
||||
|
||||
self.assertIn(ANSI_RED, str(result))
|
||||
|
||||
|
||||
class TestMatchIP(TestCase):
|
||||
"""
|
||||
test utils.match_ip
|
||||
"""
|
||||
|
||||
def test_match_ip(self):
|
||||
self.assertFalse(utils.match_ip("192.168.0.1", "10.0.0.0/24"))
|
||||
self.assertTrue(utils.match_ip("192.168.0.1", "192.168.0.0/24"))
|
||||
self.assertTrue(utils.match_ip("192.168.0.1", "192.168.0.1"))
|
||||
self.assertFalse(utils.match_ip("192.168.0.1", "10.0.0.1"))
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import importlib
|
|||
import importlib.machinery
|
||||
import importlib.util
|
||||
import inspect
|
||||
import ipaddress
|
||||
import math
|
||||
import os
|
||||
import random
|
||||
|
|
@ -2942,3 +2943,78 @@ def str2int(number):
|
|||
# invalid number-word, raise ValueError
|
||||
raise ValueError(f"String {original_input} cannot be converted to int.")
|
||||
return sum(sums)
|
||||
|
||||
|
||||
def match_ip(address, pattern) -> bool:
|
||||
"""
|
||||
Check if an IP address matches a given pattern. The pattern can be a single IP address
|
||||
such as 8.8.8.8 or a CIDR-formatted subnet like 10.0.0.0/8
|
||||
|
||||
IPv6 is supported to, with CIDR-subnets looking like 2001:db8::/48
|
||||
|
||||
Args:
|
||||
address (str): The source address being checked.
|
||||
pattern (str): The single IP address or subnet to check against.
|
||||
|
||||
Returns:
|
||||
result (bool): Whether it was a match or not.
|
||||
"""
|
||||
try:
|
||||
# Convert the given IP address to an IPv4Address or IPv6Address object
|
||||
ip_obj = ipaddress.ip_address(address)
|
||||
except ValueError:
|
||||
# Invalid IP address format
|
||||
return False
|
||||
|
||||
try:
|
||||
# Check if pattern is a single IP or a subnet
|
||||
if "/" in pattern:
|
||||
# It's (hopefully) a subnet in CIDR notation
|
||||
network = ipaddress.ip_network(pattern, strict=False)
|
||||
if ip_obj in network:
|
||||
return True
|
||||
else:
|
||||
# It's a single IP address
|
||||
if ip_obj == ipaddress.ip_address(pattern):
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
def ip_from_request(request, exclude=None) -> str:
|
||||
"""
|
||||
Retrieves the IP address from a web Request, while respecting X-Forwarded-For and
|
||||
settings.UPSTREAM_IPS.
|
||||
|
||||
Args:
|
||||
request (django Request or twisted.web.http.Request): The web request.
|
||||
exclude: (list, optional): A list of IP addresses to exclude from the check. If left none,
|
||||
then settings.UPSTREAM_IPS will be used.
|
||||
|
||||
Returns:
|
||||
ip (str): The IP address the request originated from.
|
||||
"""
|
||||
if exclude is None:
|
||||
exclude = settings.UPSTREAM_IPS
|
||||
|
||||
if hasattr(request, "getClientIP"):
|
||||
# It's a twisted request.
|
||||
remote_addr = request.getClientIP()
|
||||
forwarded = request.getHeader("x-forwarded-for")
|
||||
else:
|
||||
# it's a Django request.
|
||||
remote_addr = request.META.get("REMOTE_ADDR")
|
||||
forwarded = request.META.get("HTTP_X_FORWARDED_FOR")
|
||||
|
||||
addresses = [remote_addr]
|
||||
|
||||
if forwarded:
|
||||
addresses.extend(x.strip() for x in forwarded.split(","))
|
||||
|
||||
for addr in reversed(addresses):
|
||||
if all(not match_ip(addr, pattern) for pattern in exclude):
|
||||
return addr
|
||||
|
||||
logger.log_warn("ip_from_request: No valid IP address found in request. Using remote_addr.")
|
||||
return remote_addr
|
||||
|
|
|
|||
|
|
@ -1,7 +1,21 @@
|
|||
from django.contrib.auth import authenticate, login
|
||||
|
||||
from evennia.accounts.models import AccountDB
|
||||
from evennia.utils import logger
|
||||
from evennia.utils import logger, ip_from_request
|
||||
|
||||
|
||||
class OriginIpMiddleware:
|
||||
"""
|
||||
This Django Middleware simply sets the request.origin_ip attribute to what is
|
||||
respected by the Evennia Server, taking into account settings.UPSTREAM_IPS.
|
||||
"""
|
||||
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
request.origin_ip = ip_from_request(request)
|
||||
return self.get_response(request)
|
||||
|
||||
|
||||
class SharedLoginMiddleware(object):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue