Merge branch 'develop' into contrib/evadventure

This commit is contained in:
Griatch 2022-04-23 15:47:32 +02:00
commit e7bc8d9836
14 changed files with 517 additions and 88 deletions

View file

@ -58,9 +58,24 @@ class TestGeneral(BaseEvenniaCommandTest):
rid = self.room1.id
self.call(general.CmdLook(), "here", "Room(#{})\nroom_desc".format(rid))
def test_look_no_location(self):
self.char1.location = None
self.call(general.CmdLook(), "", "You have no location to look at!")
def test_look_nonexisting(self):
self.call(general.CmdLook(), "yellow sign", "Could not find 'yellow sign'.")
def test_home(self):
self.call(general.CmdHome(), "", "You are already home")
def test_go_home(self):
self.call(building.CmdTeleport(), "/quiet Room2")
self.call(general.CmdHome(), "", "There's no place like home")
def test_no_home(self):
self.char1.home = None
self.call(general.CmdHome(), "", "You have no home")
def test_inventory(self):
self.call(general.CmdInventory(), "", "You are not carrying anything.")
@ -90,6 +105,12 @@ class TestGeneral(BaseEvenniaCommandTest):
self.assertEqual(None, self.char1.account.nicks.get("testalias", category="account"))
self.assertEqual("testaliasedstring3", self.char1.nicks.get("testalias", category="object"))
def test_nick_list(self):
self.call(general.CmdNick(), "/list", "No nicks defined.")
self.call(general.CmdNick(), "test1 = Hello",
"Inputline-nick 'test1' mapped to 'Hello'.")
self.call(general.CmdNick(), "/list", "Defined Nicks:")
def test_get_and_drop(self):
self.call(general.CmdGet(), "Obj", "You pick up Obj.")
self.call(general.CmdDrop(), "Obj", "You drop Obj.")

View file

@ -39,7 +39,7 @@ class Health(Component):
Components may define DBFields or NDBFields at the class level.
DBField will store its values in the host's DB with a prefixed key.
NDBField will store its values in the host's NDB and will not persist.
The key used will be 'component_name__field_name'.
The key used will be 'component_name::field_name'.
They use AttributeProperty under the hood.
Example:

View file

@ -21,7 +21,7 @@ class DBField(AttributeProperty):
owner (object): The component classF on which this is set
name (str): The name that was used to set the DBField.
"""
key = f"{owner.name}__{name}"
key = f"{owner.name}::{name}"
self._key = key
db_fields = getattr(owner, "_db_fields", None)
if db_fields is None:
@ -45,7 +45,7 @@ class NDBField(NAttributeProperty):
owner (object): The component class on which this is set
name (str): The name that was used to set the DBField.
"""
key = f"{owner.name}__{name}"
key = f"{owner.name}::{name}"
self._key = key
ndb_fields = getattr(owner, "_ndb_fields", None)
if ndb_fields is None:

View file

@ -5,6 +5,7 @@ This file contains the classes that allow a typeclass to use components.
"""
from evennia.contrib.base_systems import components
from evennia.contrib.base_systems.components import signals
class ComponentProperty:
@ -66,6 +67,7 @@ class ComponentHandler:
self.db_names.append(component.name)
self._add_component_tags(component)
component.at_added(self.host)
self.host.signals.add_object_listeners_and_responders(component)
def add_default(self, name):
"""
@ -87,6 +89,7 @@ class ComponentHandler:
self.db_names.append(name)
self._add_component_tags(new_component)
new_component.at_added(self.host)
self.host.signals.add_object_listeners_and_responders(new_component)
def _add_component_tags(self, component):
"""
@ -118,6 +121,7 @@ class ComponentHandler:
self._remove_component_tags(component)
component.at_removed(self.host)
self.db_names.remove(component_name)
self.host.signals.remove_object_listeners_and_responders(component)
del self._loaded_components[component_name]
else:
message = f"Cannot remove {component_name} from {self.host.name} as it is not registered."
@ -140,6 +144,7 @@ class ComponentHandler:
self._remove_component_tags(instance)
instance.at_removed(self.host)
self.host.signals.remove_object_listeners_and_responders(instance)
self.db_names.remove(name)
del self._loaded_components[name]
@ -192,6 +197,7 @@ class ComponentHandler:
if component:
component_instance = component.load(self.host)
self._set_component(component_instance)
self.host.signals.add_object_listeners_and_responders(component_instance)
else:
message = f"Could not initialize runtime component {component_name} of {self.host.name}"
raise ComponentDoesNotExist(message)
@ -214,7 +220,7 @@ class ComponentHandler:
return self.get(name)
class ComponentHolderMixin(object):
class ComponentHolderMixin:
"""
Mixin to add component support to a typeclass
@ -229,7 +235,17 @@ class ComponentHolderMixin(object):
"""
super(ComponentHolderMixin, self).at_init()
setattr(self, "_component_handler", ComponentHandler(self))
setattr(self, "_signal_handler", signals.SignalsHandler(self))
self.components.initialize()
self.signals.trigger("at_after_init")
def at_post_puppet(self, *args, **kwargs):
super().at_post_puppet(*args, **kwargs)
self.signals.trigger("at_post_puppet", *args, **kwargs)
def at_post_unpuppet(self, *args, **kwargs):
super().at_post_unpuppet(*args, **kwargs)
self.signals.trigger("at_post_unpuppet", *args, **kwargs)
def basetype_setup(self):
"""
@ -239,14 +255,17 @@ class ComponentHolderMixin(object):
super().basetype_setup()
component_names = []
setattr(self, "_component_handler", ComponentHandler(self))
setattr(self, "_signal_handler", signals.SignalsHandler(self))
class_components = getattr(self, "_class_components", ())
for component_name, values in class_components:
component_class = components.get_component_class(component_name)
component = component_class.create(self, **values)
component_names.append(component_name)
self.components._loaded_components[component_name] = component
self.signals.add_object_listeners_and_responders(component)
self.db.component_names = component_names
self.signals.trigger("at_basetype_setup")
def basetype_posthook_setup(self):
"""
@ -274,6 +293,10 @@ class ComponentHolderMixin(object):
"""
return self.components
@property
def signals(self) -> signals.SignalsHandler:
return getattr(self, "_signal_handler", None)
class ComponentDoesNotExist(Exception):
pass

View file

@ -0,0 +1,207 @@
"""
Components - ChrisLR 2022
This file contains classes functions related to signals.
"""
def as_listener(func=None, signal_name=None):
"""
Decorator style function that marks a method to be connected as listener.
It will use the provided signal name and default to the decorated function name.
Args:
func (callable): The method to mark as listener
signal_name (str): The name of the signal to listen to, defaults to function name.
"""
if not func and signal_name:
def wrapper(func):
func._listener_signal_name = signal_name
return func
return wrapper
signal_name = func.__name__
func._listener_signal_name = signal_name
return func
def as_responder(func=None, signal_name=None):
"""
Decorator style function that marks a method to be connected as responder.
It will use the provided signal name and default to the decorated function name.
Args:
func (callable): The method to mark as responder
signal_name (str): The name of the signal to respond to, defaults to function name.
"""
if not func and signal_name:
def wrapper(func):
func._responder_signal_name = signal_name
return func
return wrapper
signal_name = func.__name__
func._responder_signal_name = signal_name
return func
class SignalsHandler(object):
"""
This object handles all about signals.
It holds the connected listeners and responders.
It allows triggering signals or querying responders.
"""
def __init__(self, host):
self.host = host
self.listeners = {}
self.responders = {}
self.add_object_listeners_and_responders(host)
def add_listener(self, signal_name, callback):
"""
Connect a listener to a specific signal.
Args:
signal_name (str): The name of the signal to listen to
callback (callable): The callable that is called when the signal is triggered
"""
signal_listeners = self.listeners.setdefault(signal_name, [])
if callback not in signal_listeners:
signal_listeners.append(callback)
def add_responder(self, signal_name, callback):
"""
Connect a responder to a specific signal.
Args:
signal_name (str): The name of the signal to respond to
callback (callable): The callable that is called when the signal is queried
"""
signal_responders = self.responders.setdefault(signal_name, [])
if callback not in signal_responders:
signal_responders.append(callback)
def remove_listener(self, signal_name, callback):
"""
Removes a listener for a specific signal.
Args:
signal_name (str): The name of the signal to disconnect from
callback (callable): The callable that was used to connect
"""
signal_listeners = self.listeners.get(signal_name)
if not signal_listeners:
return
if callback in signal_listeners:
signal_listeners.remove(callback)
def remove_responder(self, signal_name, callback):
"""
Removes a responder for a specific signal.
Args:
signal_name (str): The name of the signal to disconnect from
callback (callable): The callable that was used to connect
"""
signal_responders = self.responders.get(signal_name)
if not signal_responders:
return
if callback in signal_responders:
signal_responders.remove(callback)
def trigger(self, signal_name, *args, **kwargs):
"""
Triggers a specific signal with specified args and kwargs
This method does not return anything
Args:
signal_name (str): The name of the signal to trigger
"""
callbacks = self.listeners.get(signal_name)
if not callbacks:
return
for callback in callbacks:
callback(*args, **kwargs)
def query(self, signal_name, *args, default=None, aggregate_func=None, **kwargs):
"""
Queries a specific signal with specified args and kwargs
This method will return the responses from its connected responders.
If an aggregate_func is specified, it is called with the responses
and its result is returned instead.
Args:
signal_name (str): The name of the signal to trigger
default (any): The value to use when no responses are given
It will be passed to aggregate_func if it is also given.
aggregate_func (callable): The function to process the results before returning.
Returns:
list: An iterable of the responses
OR the aggregated result when aggregate_func is specified.
"""
callbacks = self.responders.get(signal_name)
if not callbacks:
default = [] if default is None else default
if aggregate_func:
return aggregate_func(default)
return default
responses = []
for callback in callbacks:
response = callback(*args, **kwargs)
if response is not None:
responses.append(response)
if aggregate_func and responses:
return aggregate_func(responses)
return responses
def add_object_listeners_and_responders(self, obj):
"""
This connects the methods marked as listener or responder from an object.
Args:
obj (object): The instance of an object to connect to this handler.
"""
type_host = type(obj)
for att_name, att_obj in type_host.__dict__.items():
listener_signal_name = getattr(att_obj, '_listener_signal_name', None)
if listener_signal_name:
callback = getattr(obj, att_name)
self.add_listener(signal_name=listener_signal_name, callback=callback)
responder_signal_name = getattr(att_obj, '_responder_signal_name', None)
if responder_signal_name:
callback = getattr(obj, att_name)
self.add_responder(signal_name=responder_signal_name, callback=callback)
def remove_object_listeners_and_responders(self, obj):
"""
This disconnects the methods marked as listener or responder from an object.
Args:
obj (object): The instance of an object to disconnect from this handler.
"""
type_host = type(obj)
for att_name, att_obj in type_host.__dict__.items():
listener_signal_name = getattr(att_obj, '_listener_signal_name', None)
if listener_signal_name:
callback = getattr(obj, att_name)
self.remove_listener(signal_name=listener_signal_name, callback=callback)
responder_signal_name = getattr(att_obj, '_responder_signal_name', None)
if responder_signal_name:
callback = getattr(obj, att_name)
self.remove_responder(signal_name=responder_signal_name, callback=callback)

View file

@ -1,7 +1,9 @@
from evennia.contrib.base_systems.components import Component, DBField, TagField
from evennia.contrib.base_systems.components import Component, DBField, TagField, signals
from evennia.contrib.base_systems.components.holder import ComponentProperty, ComponentHolderMixin
from evennia.contrib.base_systems.components.signals import as_listener
from evennia.objects.objects import DefaultCharacter
from evennia.utils.test_resources import EvenniaTest
from evennia.utils import create
from evennia.utils.test_resources import EvenniaTest, BaseEvenniaTest
class ComponentTestA(Component):
@ -186,3 +188,211 @@ class TestComponents(EvenniaTest):
assert self.char1.tags.has(key="first value", category="test_b::multiple_tags")
assert self.char1.tags.has(key="second value", category="test_b::multiple_tags")
assert self.char1.tags.has(key="third value", category="test_b::multiple_tags")
class CharWithSignal(ComponentHolderMixin, DefaultCharacter):
@signals.as_listener
def my_signal(self):
setattr(self, 'my_signal_is_called', True)
@signals.as_listener
def my_other_signal(self):
setattr(self, 'my_other_signal_is_called', True)
@signals.as_responder
def my_response(self):
return 1
@signals.as_responder
def my_other_response(self):
return 2
class ComponentWithSignal(Component):
name = "test_signal_a"
@signals.as_listener
def my_signal(self):
setattr(self, 'my_signal_is_called', True)
@signals.as_listener
def my_other_signal(self):
setattr(self, 'my_other_signal_is_called', True)
@signals.as_responder
def my_response(self):
return 1
@signals.as_responder
def my_other_response(self):
return 2
@signals.as_responder
def my_component_response(self):
return 3
class TestComponentSignals(BaseEvenniaTest):
def setUp(self):
super().setUp()
self.char1 = create.create_object(
CharWithSignal, key="Char",
)
def test_host_can_register_as_listener(self):
self.char1.signals.trigger("my_signal")
assert self.char1.my_signal_is_called
assert not getattr(self.char1, 'my_other_signal_is_called', None)
def test_host_can_register_as_responder(self):
responses = self.char1.signals.query("my_response")
assert 1 in responses
assert 2 not in responses
def test_component_can_register_as_listener(self):
char = self.char1
char.components.add(ComponentWithSignal.create(char))
char.signals.trigger("my_signal")
component = char.cmp.test_signal_a
assert component.my_signal_is_called
assert not getattr(component, 'my_other_signal_is_called', None)
def test_component_can_register_as_responder(self):
char = self.char1
char.components.add(ComponentWithSignal.create(char))
responses = char.signals.query("my_response")
assert 1 in responses
assert 2 not in responses
def test_signals_can_add_listener(self):
result = []
def my_fake_listener():
result.append(True)
self.char1.signals.add_listener("my_fake_signal", my_fake_listener)
self.char1.signals.trigger("my_fake_signal")
assert result
def test_signals_can_add_responder(self):
def my_fake_responder():
return 1
self.char1.signals.add_responder("my_fake_response", my_fake_responder)
responses = self.char1.signals.query("my_fake_response")
assert 1 in responses
def test_signals_can_remove_listener(self):
result = []
def my_fake_listener():
result.append(True)
self.char1.signals.add_listener("my_fake_signal", my_fake_listener)
self.char1.signals.remove_listener("my_fake_signal", my_fake_listener)
self.char1.signals.trigger("my_fake_signal")
assert not result
def test_signals_can_remove_responder(self):
def my_fake_responder():
return 1
self.char1.signals.add_responder("my_fake_response", my_fake_responder)
self.char1.signals.remove_responder("my_fake_response", my_fake_responder)
responses = self.char1.signals.query("my_fake_response")
assert not responses
def test_signals_can_trigger_with_args(self):
result = []
def my_fake_listener(arg1, kwarg1):
result.append((arg1, kwarg1))
self.char1.signals.add_listener("my_fake_signal", my_fake_listener)
self.char1.signals.trigger("my_fake_signal", 1, kwarg1=2)
assert (1, 2) in result
def test_signals_can_query_with_args(self):
def my_fake_responder(arg1, kwarg1):
return (arg1, kwarg1)
self.char1.signals.add_responder("my_fake_response", my_fake_responder)
responses = self.char1.signals.query("my_fake_response", 1, kwarg1=2)
assert (1, 2) in responses
def test_signals_trigger_does_not_fail_without_listener(self):
self.char1.signals.trigger("some_unknown_signal")
def test_signals_query_does_not_fail_wihout_responders(self):
self.char1.signals.query("no_responders_allowed")
def test_signals_query_with_aggregate(self):
def my_fake_responder(arg1, kwarg1):
return (arg1, kwarg1)
self.char1.signals.add_responder("my_fake_response", my_fake_responder)
responses = self.char1.signals.query("my_fake_response", 1, kwarg1=2)
assert (1, 2) in responses
def test_signals_can_add_object_listeners_and_responders(self):
result = []
class FakeObj:
@as_listener
def my_signal(self):
result.append(True)
self.char1.signals.add_object_listeners_and_responders(FakeObj())
self.char1.signals.trigger("my_signal")
assert result
def test_signals_can_remove_object_listeners_and_responders(self):
result = []
class FakeObj:
@as_listener
def my_signal(self):
result.append(True)
obj = FakeObj()
self.char1.signals.add_object_listeners_and_responders(obj)
self.char1.signals.remove_object_listeners_and_responders(obj)
self.char1.signals.trigger("my_signal")
assert not result
def test_component_handler_signals_connected_when_adding_default_component(self):
char = self.char1
char.components.add_default("test_signal_a")
responses = char.signals.query("my_component_response")
assert 3 in responses
def test_component_handler_signals_disconnected_when_removing_component(self):
char = self.char1
comp = ComponentWithSignal.create(char)
char.components.add(comp)
char.components.remove(comp)
responses = char.signals.query("my_component_response")
assert not responses
def test_component_handler_signals_disconnected_when_removing_component_by_name(self):
char = self.char1
char.components.add_default("test_signal_a")
char.components.remove_by_name("test_signal_a")
responses = char.signals.query("my_component_response")
assert not responses

View file

@ -319,14 +319,15 @@ def regex_tuple_from_key_alias(obj):
"""
global _REGEX_TUPLE_CACHE
permutation_string = " ".join([obj.key] + obj.aliases.all())
cache_key = f"{obj.id} {permutation_string}"
if permutation_string not in _REGEX_TUPLE_CACHE:
_REGEX_TUPLE_CACHE[permutation_string] = (
if cache_key not in _REGEX_TUPLE_CACHE:
_REGEX_TUPLE_CACHE[cache_key] = (
re.compile(ordered_permutation_regex(permutation_string), _RE_FLAGS),
obj,
obj.key,
)
return _REGEX_TUPLE_CACHE[permutation_string]
return _REGEX_TUPLE_CACHE[cache_key]
def parse_language(speaker, emote):

View file

@ -232,20 +232,36 @@ class TestContentHandler(BaseEvenniaTest):
self.assertEqual(self.room2.contents, [self.obj1, self.obj2])
class SubAttributeProperty(AttributeProperty):
pass
class SubTagProperty(TagProperty):
pass
class TestObjectPropertiesClass(DefaultObject):
attr1 = AttributeProperty(default="attr1")
attr2 = AttributeProperty(default="attr2", category="attrcategory")
attr3 = AttributeProperty(default="attr3", autocreate=False)
attr4 = SubAttributeProperty(default="attr4")
tag1 = TagProperty()
tag2 = TagProperty(category="tagcategory")
tag3 = SubTagProperty()
testalias = AliasProperty()
testperm = PermissionProperty()
@property
def base_property(self):
self.property_initialized = True
class TestProperties(EvenniaTestCase):
"""
Test Properties.
"""
def setUp(self):
self.obj = create.create_object(TestObjectPropertiesClass, key="testobj")
@ -270,13 +286,22 @@ class TestProperties(EvenniaTestCase):
self.assertFalse(obj.attributes.has("attr3"))
self.assertEqual(obj.attr3, "attr3")
obj.attr3 = "attr3b" # stores it in db!
self.assertEqual(obj.db.attr4, "attr4")
self.assertEqual(obj.attributes.get("attr4"), "attr4")
self.assertEqual(obj.attr4, "attr4")
obj.attr3 = "attr3b" # stores it in db!
self.assertEqual(obj.db.attr3, "attr3b")
self.assertTrue(obj.attributes.has("attr3"))
self.assertTrue(obj.tags.has("tag1"))
self.assertTrue(obj.tags.has("tag2", category="tagcategory"))
self.assertTrue(obj.tags.has("tag3"))
self.assertTrue(obj.aliases.has("testalias"))
self.assertTrue(obj.permissions.has("testperm"))
# Verify that regular properties do not get fetched in init_evennia_properties,
# only Attribute or TagProperties.
self.assertFalse(hasattr(obj, "property_initialized"))

View file

@ -585,9 +585,9 @@ class TickerHandler(object):
self.ticker_pool.stop(interval)
if interval:
self.ticker_storage = dict(
(store_key, store_key)
for store_key in self.ticker_storage
if store_key[1] != interval
(store_key, store_value)
for store_key, store_value in self.ticker_storage.items()
if store_key[3] != interval
)
else:
self.ticker_storage = {}

View file

@ -39,11 +39,12 @@ from django.utils.text import slugify
from evennia.typeclasses.attributes import (
Attribute,
AttributeHandler,
AttributeProperty,
ModelAttributeBackend,
InMemoryAttributeBackend,
)
from evennia.typeclasses.attributes import DbHolder
from evennia.typeclasses.tags import Tag, TagHandler, AliasHandler, PermissionHandler
from evennia.typeclasses.tags import Tag, TagHandler, AliasHandler, PermissionHandler, TagProperty
from evennia.utils.idmapper.models import SharedMemoryModel, SharedMemoryModelBase
from evennia.server.signals import SIGNAL_TYPED_OBJECT_POST_RENAME
@ -331,7 +332,7 @@ class TypedObject(SharedMemoryModel):
by fetching them once.
"""
for propkey, prop in self.__class__.__dict__.items():
if hasattr(prop, "__set_name__"):
if isinstance(prop, (AttributeProperty, TagProperty)):
try:
getattr(self, propkey)
except Exception:

View file

@ -167,7 +167,7 @@ class GlobalScriptContainer(Container):
# store a hash representation of the setup
script.attributes.add("_global_script_settings", compare_hash, category="settings_hash")
script.start()
script.start()
return script

View file

@ -120,13 +120,6 @@ class TestText2Html(TestCase):
)
# TODO: doesn't URL encode correctly
def test_re_double_space(self):
parser = text2html.HTML_PARSER
self.assertEqual("foo", parser.re_double_space("foo"))
self.assertEqual(
"a  red    foo", parser.re_double_space("a red foo")
)
def test_sub_mxp_links(self):
parser = text2html.HTML_PARSER
mocked_match = mock.Mock()
@ -156,7 +149,7 @@ class TestText2Html(TestCase):
"tab": "\t",
"space": "",
}
self.assertEqual("  ", parser.sub_text(mocked_match))
self.assertEqual(" ", parser.sub_text(mocked_match))
mocked_match.groupdict.return_value = {
"htmlchars": "",
@ -165,7 +158,7 @@ class TestText2Html(TestCase):
"space": " ",
"spacestart": " ",
}
self.assertEqual("    ", parser.sub_text(mocked_match))
self.assertEqual(" ", parser.sub_text(mocked_match))
mocked_match.groupdict.return_value = {
"htmlchars": "",
@ -181,24 +174,13 @@ class TestText2Html(TestCase):
parser = text2html.HTML_PARSER
parser.tabstop = 4
# single tab
self.assertEqual(parser.parse("foo|>foo"), "foo    foo")
self.assertEqual(parser.parse("foo|>foo"), "foo foo")
# space and tab
self.assertEqual(parser.parse("foo |>foo"), "foo     foo")
self.assertEqual(parser.parse("foo |>foo"), "foo foo")
# space, tab, space
self.assertEqual(parser.parse("foo |> foo"), "foo      foo")
def test_parse_space_to_html(self):
"""test space parsing - a single space should be kept, two or more
should get  """
parser = text2html.HTML_PARSER
# single space
self.assertEqual(parser.parse("foo foo"), "foo foo")
# double space
self.assertEqual(parser.parse("foo foo"), "foo  foo")
# triple space
self.assertEqual(parser.parse("foo foo"), "foo   foo")
self.assertEqual(parser.parse("foo |> foo"), "foo foo")
def test_parse_html(self):
self.assertEqual("foo", text2html.parse_html("foo"))

View file

@ -79,11 +79,11 @@ class TextToHTMLparser(object):
# create stop markers
fgstop = "(?:\033\[1m|\033\[22m){0,1}\033\[3[0-8].*?m|\033\[0m|$"
bgstop = "(?:\033\[1m|\033\[22m){0,1}\033\[4[0-8].*?m|\033\[0m|$"
bgfgstop = bgstop[:-2] + r"(\s*)" + fgstop
bgfgstop = bgstop[:-2] + fgstop
fgstart = "((?:\033\[1m|\033\[22m){0,1}\033\[3[0-8].*?m)"
bgstart = "((?:\033\[1m|\033\[22m){0,1}\033\[4[0-8].*?m)"
bgfgstart = bgstart + r"(\s*)" + "((?:\033\[1m|\033\[22m){0,1}\033\[[3-4][0-8].*?m){0,1}"
bgfgstart = bgstart + r"((?:\033\[1m|\033\[22m){0,1}\033\[[3-4][0-8].*?m){0,1}"
# extract color markers, tagging the start marker and the text marked
re_fgs = re.compile(fgstart + "(.*?)(?=" + fgstop + ")")
@ -97,12 +97,9 @@ class TextToHTMLparser(object):
re_blink = re.compile("(?:%s)(.*?)(?=%s|%s)" % (blink.replace("[", r"\["), fgstop, bgstop))
re_inverse = re.compile("(?:%s)(.*?)(?=%s|%s)" % (inverse.replace("[", r"\["), fgstop, bgstop))
re_string = re.compile(
r"(?P<htmlchars>[<&>])|(?P<tab>[\t]+)|(?P<space> +)|"
r"(?P<spacestart>^ )|(?P<lineend>\r\n|\r|\n)",
r"(?P<htmlchars>[<&>])|(?P<tab>[\t]+)|(?P<lineend>\r\n|\r|\n)",
re.S | re.M | re.I,
)
re_dblspace = re.compile(r" {2,}", re.M)
re_invisiblespace = re.compile(r"( <.*?>)( )")
re_url = re.compile(
r'(?<!=")((?:ftp|www|https?)\W+(?:(?!\.(?:\s|$)|&\w+;)[^"\',;$*^\\(){}<>\[\]\s])+)(\.(?:\s|$)|&\w+;|)'
)
@ -111,20 +108,16 @@ class TextToHTMLparser(object):
def _sub_bgfg(self, colormatch):
# print("colormatch.groups()", colormatch.groups())
bgcode, prespace, fgcode, text, postspace = colormatch.groups()
bgcode, fgcode, text = colormatch.groups()
if not fgcode:
ret = r"""<span class="%s">%s%s%s</span>""" % (
ret = r"""<span class="%s">%s</span>""" % (
self.bg_colormap.get(bgcode, self.fg_colormap.get(bgcode, "err")),
prespace and "&nbsp;" * len(prespace) or "",
postspace and "&nbsp;" * len(postspace) or "",
text,
)
else:
ret = r"""<span class="%s"><span class="%s">%s%s%s</span></span>""" % (
ret = r"""<span class="%s"><span class="%s">%s</span></span>""" % (
self.bg_colormap.get(bgcode, self.fg_colormap.get(bgcode, "err")),
self.fg_colormap.get(fgcode, self.bg_colormap.get(fgcode, "err")),
prespace and "&nbsp;" * len(prespace) or "",
postspace and "&nbsp;" * len(postspace) or "",
text,
)
return ret
@ -265,20 +258,6 @@ class TextToHTMLparser(object):
# change pages (and losing our webclient session).
return self.re_url.sub(r'<a href="\1" target="_blank">\1</a>\2', text)
def re_double_space(self, text):
"""
HTML will swallow any normal space after the first, so if any slipped
through we must make sure to replace them with " &nbsp;"
"""
return self.re_dblspace.sub(self.sub_dblspace, text)
def re_invisible_space(self, text):
"""
If two spaces are separated by an invisble html element, they act as a
hidden double-space and the last of them should be replaced by &nbsp;
"""
return self.re_invisiblespace.sub(self.sub_invisiblespace, text)
def sub_mxp_links(self, match):
"""
Helper method to be passed to re.sub,
@ -332,28 +311,10 @@ class TextToHTMLparser(object):
elif cdict["lineend"]:
return "<br>"
elif cdict["tab"]:
text = cdict["tab"].replace("\t", " " + "&nbsp;" * (self.tabstop - 1))
return text
elif cdict["space"] or cdict["spacestart"]:
text = cdict["space"]
text = " " if len(text) == 1 else " " + text[1:].replace(" ", "&nbsp;")
text = cdict["tab"].replace("\t", " " * (self.tabstop))
return text
return None
def sub_dblspace(self, match):
"clean up double-spaces"
return " " + "&nbsp;" * (len(match.group()) - 1)
def sub_invisiblespace(self, match):
"clean up invisible spaces"
return match.group(1) + "&nbsp;"
def handle_single_first_space(self, text):
"Don't swallow an initial lone space"
if text.startswith(" "):
return "&nbsp;" + text[1:]
return text
def parse(self, text, strip_ansi=False):
"""
Main access function, converts a text containing ANSI codes
@ -383,9 +344,6 @@ class TextToHTMLparser(object):
result = self.convert_linebreaks(result)
result = self.remove_backspaces(result)
result = self.convert_urls(result)
result = self.re_double_space(result)
result = self.re_invisible_space(result)
result = self.handle_single_first_space(result)
# clean out eventual ansi that was missed
## result = parse_ansi(result, strip_ansi=True)

View file

@ -49,6 +49,7 @@ div {margin:0px;}
.out {
color: #aaa;
background-color: #000;
white-space: pre-wrap;
}
/* Error messages (red) */