mirror of
https://github.com/evennia/evennia.git
synced 2026-03-16 21:06:30 +01:00
Use to_str/to_bytes, replacing old versions
This commit is contained in:
commit
c3ebd8d251
24 changed files with 130 additions and 121 deletions
|
|
@ -87,6 +87,12 @@ Web/Django standard initiative (@strikaco)
|
|||
|
||||
- Swap argument order of `evennia.set_trace` to `set_trace(term_size=(140, 40), debugger='auto')`
|
||||
since the size is more likely to be changed on the command line.
|
||||
- `utils.to_str(text, session=None)` now acts as the old `utils.to_unicode` (which was removed).
|
||||
This converts to the str() type (not to a byte-string as in Evennia 0.8), trying different
|
||||
encodings. This function will also force-convert any object passed to it into a string (so
|
||||
`force_string` flag was removed and assumed always set).
|
||||
- `utils.to_bytes(text, session=None)` replaces the old `utils.to_str()` functionality and converts
|
||||
str to bytes.
|
||||
|
||||
|
||||
## Evennia 0.8 (2018)
|
||||
|
|
|
|||
|
|
@ -196,18 +196,18 @@ class DefaultAccount(with_metaclass(TypeclassBase, AccountDB)):
|
|||
@lazy_property
|
||||
def sessions(self):
|
||||
return AccountSessionHandler(self)
|
||||
|
||||
|
||||
# Do not make this a lazy property; the web UI will not refresh it!
|
||||
@property
|
||||
def characters(self):
|
||||
# Get playable characters list
|
||||
objs = self.db._playable_characters
|
||||
|
||||
|
||||
# Rebuild the list if legacy code left null values after deletion
|
||||
if None in objs:
|
||||
objs = [x for x in self.db._playable_characters if x]
|
||||
self.db._playable_characters = objs
|
||||
|
||||
|
||||
return objs
|
||||
|
||||
# session-related methods
|
||||
|
|
@ -814,13 +814,7 @@ class DefaultAccount(with_metaclass(TypeclassBase, AccountDB)):
|
|||
kwargs["options"] = options
|
||||
|
||||
if text is not None:
|
||||
if not (isinstance(text, str) or isinstance(text, tuple)):
|
||||
# sanitize text before sending across the wire
|
||||
try:
|
||||
text = to_str(text, force_string=True)
|
||||
except Exception:
|
||||
text = repr(text)
|
||||
kwargs['text'] = text
|
||||
kwargs['text'] = to_str(text)
|
||||
|
||||
# session relay
|
||||
sessions = make_iter(session) if session else self.sessions.all()
|
||||
|
|
|
|||
|
|
@ -223,12 +223,12 @@ class CmdCharDelete(COMMAND_DEFAULT_CLASS):
|
|||
|
||||
match = match[0]
|
||||
account.ndb._char_to_delete = match
|
||||
|
||||
|
||||
# Return if caller has no permission to delete this
|
||||
if not match.access(account, 'delete'):
|
||||
self.msg("You do not have permission to delete this character.")
|
||||
return
|
||||
|
||||
|
||||
prompt = "|rThis will permanently destroy '%s'. This cannot be undone.|n Continue yes/[no]?"
|
||||
get_input(account, prompt % match.key, _callback)
|
||||
|
||||
|
|
|
|||
|
|
@ -2077,7 +2077,7 @@ class CmdExamine(ObjManipCommand):
|
|||
"""
|
||||
if crop:
|
||||
if not isinstance(value, str):
|
||||
value = utils.to_str(value, force_string=True)
|
||||
value = utils.to_str(value)
|
||||
value = utils.crop(value)
|
||||
|
||||
string = "\n %s = %s" % (attr, value)
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ class CommandTest(EvenniaTest):
|
|||
pass
|
||||
|
||||
# clean out evtable sugar. We only operate on text-type
|
||||
stored_msg = [args[0] if args and args[0] else kwargs.get("text", utils.to_str(kwargs, force_string=True))
|
||||
stored_msg = [args[0] if args and args[0] else kwargs.get("text", utils.to_str(kwargs))
|
||||
for name, args, kwargs in receiver.msg.mock_calls]
|
||||
# Get the first element of a tuple if msg received a tuple instead of a string
|
||||
stored_msg = [smsg[0] if isinstance(smsg, tuple) else smsg for smsg in stored_msg]
|
||||
|
|
@ -283,30 +283,30 @@ class TestAccount(CommandTest):
|
|||
def test_char_create(self):
|
||||
self.call(account.CmdCharCreate(), "Test1=Test char",
|
||||
"Created new character Test1. Use @ic Test1 to enter the game", caller=self.account)
|
||||
|
||||
|
||||
def test_char_delete(self):
|
||||
# Chardelete requires user input; this test is mainly to confirm
|
||||
# Chardelete requires user input; this test is mainly to confirm
|
||||
# whether permissions are being checked
|
||||
|
||||
|
||||
# Add char to account playable characters
|
||||
self.account.db._playable_characters.append(self.char1)
|
||||
|
||||
|
||||
# Try deleting as Developer
|
||||
self.call(account.CmdCharDelete(), "Char", "This will permanently destroy 'Char'. This cannot be undone. Continue yes/[no]?", caller=self.account)
|
||||
|
||||
|
||||
# Downgrade permissions on account
|
||||
self.account.permissions.add('Player')
|
||||
self.account.permissions.remove('Developer')
|
||||
|
||||
|
||||
# Set lock on character object to prevent deletion
|
||||
self.char1.locks.add('delete:none()')
|
||||
|
||||
|
||||
# Try deleting as Player
|
||||
self.call(account.CmdCharDelete(), "Char", "You do not have permission to delete this character.", caller=self.account)
|
||||
|
||||
|
||||
# Set lock on character object to allow self-delete
|
||||
self.char1.locks.add('delete:pid(%i)' % self.account.id)
|
||||
|
||||
|
||||
# Try deleting as Player again
|
||||
self.call(account.CmdCharDelete(), "Char", "This will permanently destroy 'Char'. This cannot be undone. Continue yes/[no]?", caller=self.account)
|
||||
|
||||
|
|
|
|||
|
|
@ -229,13 +229,13 @@ class DefaultChannel(with_metaclass(TypeclassBase, ChannelDB)):
|
|||
"""
|
||||
Creates a basic Channel with default parameters, unless otherwise
|
||||
specified or extended.
|
||||
|
||||
|
||||
Provides a friendlier interface to the utils.create_channel() function.
|
||||
|
||||
|
||||
Args:
|
||||
key (str): This must be unique.
|
||||
account (Account): Account to attribute this object to.
|
||||
|
||||
|
||||
Kwargs:
|
||||
aliases (list of str): List of alternative (likely shorter) keynames.
|
||||
description (str): A description of the channel, for use in listings.
|
||||
|
|
@ -248,26 +248,26 @@ class DefaultChannel(with_metaclass(TypeclassBase, ChannelDB)):
|
|||
Returns:
|
||||
channel (Channel): A newly created Channel.
|
||||
errors (list): A list of errors in string form, if any.
|
||||
|
||||
|
||||
"""
|
||||
errors = []
|
||||
obj = None
|
||||
ip = kwargs.pop('ip', '')
|
||||
|
||||
|
||||
try:
|
||||
kwargs['desc'] = kwargs.pop('description', '')
|
||||
obj = create.create_channel(key, *args, **kwargs)
|
||||
|
||||
|
||||
# Record creator id and creation IP
|
||||
if ip: obj.db.creator_ip = ip
|
||||
if account: obj.db.creator_id = account.id
|
||||
|
||||
|
||||
except Exception as exc:
|
||||
errors.append("An error occurred while creating this '%s' object." % key)
|
||||
logger.log_err(exc)
|
||||
|
||||
|
||||
return obj, errors
|
||||
|
||||
|
||||
def delete(self):
|
||||
"""
|
||||
Deletes channel while also cleaning up channelhandler.
|
||||
|
|
@ -773,4 +773,4 @@ class DefaultChannel(with_metaclass(TypeclassBase, ChannelDB)):
|
|||
return '#'
|
||||
|
||||
# Used by Django Sites/Admin
|
||||
get_absolute_url = web_get_detail_url
|
||||
get_absolute_url = web_get_detail_url
|
||||
|
|
|
|||
|
|
@ -486,7 +486,7 @@ class TestDefaultCallbacks(CommandTest):
|
|||
try:
|
||||
self.char2.msg = Mock()
|
||||
self.call(ExitCommand(), "", obj=self.exit)
|
||||
stored_msg = [args[0] if args and args[0] else kwargs.get("text", utils.to_str(kwargs, force_string=True))
|
||||
stored_msg = [args[0] if args and args[0] else kwargs.get("text", utils.to_str(kwargs))
|
||||
for name, args, kwargs in self.char2.msg.mock_calls]
|
||||
# Get the first element of a tuple if msg received a tuple instead of a string
|
||||
stored_msg = [smsg[0] if isinstance(smsg, tuple) else smsg for smsg in stored_msg]
|
||||
|
|
@ -507,7 +507,7 @@ class TestDefaultCallbacks(CommandTest):
|
|||
try:
|
||||
self.char2.msg = Mock()
|
||||
self.call(ExitCommand(), "", obj=back)
|
||||
stored_msg = [args[0] if args and args[0] else kwargs.get("text", utils.to_str(kwargs, force_string=True))
|
||||
stored_msg = [args[0] if args and args[0] else kwargs.get("text", utils.to_str(kwargs))
|
||||
for name, args, kwargs in self.char2.msg.mock_calls]
|
||||
# Get the first element of a tuple if msg received a tuple instead of a string
|
||||
stored_msg = [smsg[0] if isinstance(smsg, tuple) else smsg for smsg in stored_msg]
|
||||
|
|
|
|||
|
|
@ -582,7 +582,7 @@ class DefaultObject(with_metaclass(TypeclassBase, ObjectDB)):
|
|||
if not (isinstance(text, str) or isinstance(text, tuple)):
|
||||
# sanitize text before sending across the wire
|
||||
try:
|
||||
text = to_str(text, force_string=True)
|
||||
text = to_str(text)
|
||||
except Exception:
|
||||
text = repr(text)
|
||||
kwargs['text'] = text
|
||||
|
|
|
|||
|
|
@ -1039,7 +1039,7 @@ def node_aliases(caller):
|
|||
|
||||
def _caller_attrs(caller):
|
||||
prototype = _get_menu_prototype(caller)
|
||||
attrs = ["{}={}".format(tup[0], utils.crop(utils.to_str(tup[1], force_string=True), width=10))
|
||||
attrs = ["{}={}".format(tup[0], utils.crop(utils.to_str(tup[1]), width=10))
|
||||
for tup in prototype.get("attrs", [])]
|
||||
return attrs
|
||||
|
||||
|
|
|
|||
|
|
@ -569,7 +569,7 @@ def protfunc_parser(value, available_functions=None, testing=False, stacktrace=F
|
|||
value = value.dbref
|
||||
except AttributeError:
|
||||
pass
|
||||
value = to_str(value, force_string=True)
|
||||
value = to_str(value)
|
||||
|
||||
available_functions = PROT_FUNCS if available_functions is None else available_functions
|
||||
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ class ScriptDB(TypedObject):
|
|||
pass
|
||||
if isinstance(value, (str, int)):
|
||||
from evennia.objects.models import ObjectDB
|
||||
value = to_str(value, force_string=True)
|
||||
value = to_str(value)
|
||||
if (value.isdigit() or value.startswith("#")):
|
||||
dbid = dbref(value, reqhash=False)
|
||||
if dbid:
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ class ServerConfig(WeakSharedMemoryModel):
|
|||
#@property
|
||||
def __value_get(self):
|
||||
"Getter. Allows for value = self.value"
|
||||
return pickle.loads(self.db_value)
|
||||
return pickle.loads(utils.to_bytes(self.db_value))
|
||||
|
||||
#@value.setter
|
||||
def __value_set(self, value):
|
||||
|
|
|
|||
|
|
@ -290,7 +290,7 @@ class SshProtocol(Manhole, session.Session):
|
|||
text = args[0] if args else ""
|
||||
if text is None:
|
||||
return
|
||||
text = to_str(text, force_string=True)
|
||||
text = to_str(text)
|
||||
|
||||
# handle arguments
|
||||
options = kwargs.get("options", {})
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ from evennia.server.portal import ttype, mssp, telnet_oob, naws, suppress_ga
|
|||
from evennia.server.portal.mccp import Mccp, mccp_compress, MCCP
|
||||
from evennia.server.portal.mxp import Mxp, mxp_parse
|
||||
from evennia.utils import ansi
|
||||
from evennia.utils.utils import to_str
|
||||
from evennia.utils.utils import to_bytes
|
||||
|
||||
_RE_N = re.compile(r"\|n$")
|
||||
_RE_LEND = re.compile(br"\n$|\r$|\r\n$|\r\x00$|", re.MULTILINE)
|
||||
|
|
@ -243,7 +243,7 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
|
|||
line (str): Line to send.
|
||||
|
||||
"""
|
||||
line = self.encode_output(line)
|
||||
line = to_bytes(line, self)
|
||||
# escape IAC in line mode, and correctly add \r\n (the TELNET end-of-line)
|
||||
line = line.replace(IAC, IAC + IAC)
|
||||
line = line.replace(b'\n', b'\r\n')
|
||||
|
|
@ -316,7 +316,6 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
|
|||
text = args[0] if args else ""
|
||||
if text is None:
|
||||
return
|
||||
text = to_str(text, force_string=True)
|
||||
|
||||
# handle arguments
|
||||
options = kwargs.get("options", {})
|
||||
|
|
@ -343,7 +342,7 @@ class TelnetProtocol(Telnet, StatefulTelnetProtocol, Session):
|
|||
strip_ansi=nocolor, xterm256=xterm256)
|
||||
if mxp:
|
||||
prompt = mxp_parse(prompt)
|
||||
prompt = self.encode_output(prompt)
|
||||
prompt = to_bytes(prompt, self)
|
||||
prompt = prompt.replace(IAC, IAC + IAC).replace(b'\n', b'\r\n')
|
||||
prompt += IAC + GA
|
||||
self.transport.write(mccp_compress(self, prompt))
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ header where applicable.
|
|||
from builtins import object
|
||||
import re
|
||||
import json
|
||||
from evennia.utils.utils import to_str, is_iter
|
||||
from evennia.utils.utils import is_iter
|
||||
|
||||
# MSDP-relevant telnet cmd/opt-codes
|
||||
MSDP = b'\x45'
|
||||
|
|
@ -46,11 +46,6 @@ GMCP = b'\xc9'
|
|||
from twisted.conch.telnet import IAC, SB, SE
|
||||
|
||||
|
||||
def force_str(inp):
|
||||
"""Helper to shorten code"""
|
||||
return to_str(inp, force_string=True)
|
||||
|
||||
|
||||
# pre-compiled regexes
|
||||
# returns 2-tuple
|
||||
msdp_regex_table = re.compile(br"%s\s*(\w*?)\s*%s\s*%s(.*?)%s"
|
||||
|
|
|
|||
|
|
@ -195,7 +195,6 @@ class WebSocketClient(WebSocketServerProtocol, Session):
|
|||
return
|
||||
|
||||
flags = self.protocol_flags
|
||||
text = to_str(text, force_string=True)
|
||||
|
||||
options = kwargs.pop("options", {})
|
||||
raw = options.get("raw", flags.get("RAW", False))
|
||||
|
|
|
|||
|
|
@ -369,7 +369,7 @@ class AjaxWebClientSession(session.Session):
|
|||
return
|
||||
|
||||
flags = self.protocol_flags
|
||||
text = utils.to_str(text, force_string=True)
|
||||
text = utils.to_str(text)
|
||||
|
||||
options = kwargs.pop("options", {})
|
||||
raw = options.get("raw", flags.get("RAW", False))
|
||||
|
|
|
|||
|
|
@ -4,12 +4,14 @@ This module defines a generic session class. All connection instances
|
|||
|
||||
"""
|
||||
from builtins import object
|
||||
from django.conf import settings
|
||||
|
||||
import time
|
||||
|
||||
#------------------------------------------------------------
|
||||
# ------------------------------------------------------------
|
||||
# Server Session
|
||||
#------------------------------------------------------------
|
||||
# ------------------------------------------------------------
|
||||
|
||||
|
||||
class Session(object):
|
||||
"""
|
||||
|
|
@ -135,41 +137,6 @@ class Session(object):
|
|||
if self.account:
|
||||
self.protocol_flags.update(self.account.attributes.get("_saved_protocol_flags", {}))
|
||||
|
||||
# helpers
|
||||
|
||||
def encode_output(self, text):
|
||||
"""
|
||||
Encode the given text for output, following the session's protocol flag.
|
||||
|
||||
Args:
|
||||
text (str or bytes): the text to encode to bytes.
|
||||
|
||||
Returns:
|
||||
encoded_text (bytes): the encoded text following the session's
|
||||
protocol flag. If the converting fails, log the error
|
||||
and send the text with "?" in place of problematic
|
||||
characters. If the specified encoding cannot be found,
|
||||
the protocol flag is reset to utf-8.
|
||||
In any case, returns bytes.
|
||||
|
||||
Note:
|
||||
If the argument is bytes, return it as is.
|
||||
|
||||
"""
|
||||
if isinstance(text, bytes):
|
||||
return text
|
||||
|
||||
try:
|
||||
encoded = text.encode(self.protocol_flags["ENCODING"])
|
||||
except LookupError:
|
||||
self.protocol_flags["ENCODING"] = 'utf-8'
|
||||
encoded = text.encode('utf-8')
|
||||
except UnicodeEncodeError:
|
||||
print("An error occurred during string encoding to {encoding}. Will remove errors and try again.".format(encoding=self.protocol_flags["ENCODING"]))
|
||||
encoded = text.encode(self.protocol_flags["ENCODING"], errors="replace")
|
||||
|
||||
return encoded
|
||||
|
||||
# access hooks
|
||||
|
||||
def disconnect(self, reason=None):
|
||||
|
|
|
|||
|
|
@ -675,7 +675,7 @@ class ANSIString(with_metaclass(ANSIMeta, str)):
|
|||
"""
|
||||
string = args[0]
|
||||
if not isinstance(string, str):
|
||||
string = to_str(string, force_string=True)
|
||||
string = to_str(string)
|
||||
parser = kwargs.get('parser', ANSI_PARSER)
|
||||
decoded = kwargs.get('decoded', False) or hasattr(string, '_raw_string')
|
||||
code_indexes = kwargs.pop('code_indexes', None)
|
||||
|
|
|
|||
|
|
@ -801,7 +801,7 @@ class EvEditor(object):
|
|||
try:
|
||||
self._buffer = self._loadfunc(self._caller)
|
||||
if not isinstance(self._buffer, str):
|
||||
self._buffer = to_str(self._buffer, force_string=True)
|
||||
self._buffer = to_str(self._buffer)
|
||||
self._caller.msg("|rNote: input buffer was converted to a string.|n")
|
||||
except Exception as e:
|
||||
from evennia.utils import logger
|
||||
|
|
|
|||
|
|
@ -207,8 +207,8 @@ class EvForm(object):
|
|||
self.filename = filename
|
||||
self.input_form_dict = form
|
||||
|
||||
self.cells_mapping = dict((to_str(key, force_string=True), value) for key, value in cells.items()) if cells else {}
|
||||
self.tables_mapping = dict((to_str(key, force_string=True), value) for key, value in tables.items()) if tables else {}
|
||||
self.cells_mapping = dict((to_str(key), value) for key, value in cells.items()) if cells else {}
|
||||
self.tables_mapping = dict((to_str(key), value) for key, value in tables.items()) if tables else {}
|
||||
|
||||
self.cellchar = "x"
|
||||
self.tablechar = "c"
|
||||
|
|
@ -378,8 +378,8 @@ class EvForm(object):
|
|||
kwargs.pop("width", None)
|
||||
kwargs.pop("height", None)
|
||||
|
||||
new_cells = dict((to_str(key, force_string=True), value) for key, value in cells.items()) if cells else {}
|
||||
new_tables = dict((to_str(key, force_string=True), value) for key, value in tables.items()) if tables else {}
|
||||
new_cells = dict((to_str(key), value) for key, value in cells.items()) if cells else {}
|
||||
new_tables = dict((to_str(key), value) for key, value in tables.items()) if tables else {}
|
||||
|
||||
self.cells_mapping.update(new_cells)
|
||||
self.tables_mapping.update(new_tables)
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ class SharedMemoryModelBase(ModelBase):
|
|||
if _GA(cls, "_is_deleted"):
|
||||
raise ObjectDoesNotExist("Cannot set %s to %s: Hosting object was already deleted!" % (fname, value))
|
||||
if isinstance(value, (str, int)):
|
||||
value = to_str(value, force_string=True)
|
||||
value = to_str(value)
|
||||
if (value.isdigit() or value.startswith("#")):
|
||||
# we also allow setting using dbrefs, if so we try to load the matching object.
|
||||
# (we assume the object is of the same type as the class holding the field, if
|
||||
|
|
|
|||
|
|
@ -421,7 +421,7 @@ def parse_inlinefunc(string, strip=False, available_funcs=None, stacktrace=False
|
|||
# execute the inlinefunc at this point or strip it.
|
||||
kwargs["inlinefunc_stack_depth"] = depth
|
||||
retval = "" if strip else func(*args, **kwargs)
|
||||
return utils.to_str(retval, force_string=True)
|
||||
return utils.to_str(retval)
|
||||
retval = "".join(_run_stack(item) for item in stack)
|
||||
if stacktrace:
|
||||
out = "STACK: \n{} => {}\n".format(stack, retval)
|
||||
|
|
|
|||
|
|
@ -784,37 +784,86 @@ def latinify(unicode_string, default='?', pure_ascii=False):
|
|||
return ''.join(converted)
|
||||
|
||||
|
||||
def to_str(obj, encoding='utf-8', force_string=False):
|
||||
def to_bytes(text, session=None):
|
||||
"""
|
||||
This function is deprecated in the Python 3 version of Evennia and is
|
||||
likely to be phased out in future releases.
|
||||
|
||||
---
|
||||
This encodes a unicode string back to byte-representation,
|
||||
for printing, writing to disk etc.
|
||||
Try to encode the given text to bytes, using encodings from settings or from Session. Will
|
||||
always return a bytes, even if given something that is not str or bytes.
|
||||
|
||||
Args:
|
||||
obj (any): Object to encode to bytecode.
|
||||
encoding (str, optional): The encoding type to use for the
|
||||
encoding.
|
||||
force_string (bool, optional): Always convert to string, no
|
||||
matter what type `obj` is initially.
|
||||
text (any): The text to encode to bytes. If bytes, return unchanged. If not a str, convert
|
||||
to str before converting.
|
||||
session (Session, optional): A Session to get encoding info from. Will try this before
|
||||
falling back to settings.ENCODINGS.
|
||||
|
||||
Notes:
|
||||
Non-string objects are let through without modification - this
|
||||
is required e.g. for Attributes. Use `force_string` to force
|
||||
conversion of objects to strings.
|
||||
Returns:
|
||||
encoded_text (bytes): the encoded text following the session's protocol flag followed by the
|
||||
encodings specified in settings.ENCODINGS. If all attempt fail, log the error and send
|
||||
the text with "?" in place of problematic characters. If the specified encoding cannot
|
||||
be found, the protocol flag is reset to utf-8. In any case, returns bytes.
|
||||
|
||||
Note:
|
||||
If `text` is already bytes, return it as is.
|
||||
|
||||
"""
|
||||
if isinstance(obj, (str, bytes )):
|
||||
return obj
|
||||
if isinstance(text, bytes):
|
||||
return text
|
||||
if not isinstance(text, str):
|
||||
# convert to a str representation before encoding
|
||||
try:
|
||||
text = str(text)
|
||||
except Exception:
|
||||
text = repr(text)
|
||||
|
||||
if force_string:
|
||||
# some sort of other object. Try to
|
||||
# convert it to a string representation.
|
||||
obj = str(obj)
|
||||
default_encoding = session.protocol_flags.get("ENCODING", 'utf-8') if session else 'utf-8'
|
||||
try:
|
||||
return text.encode(default_encoding)
|
||||
except (LookupError, UnicodeEncodeError):
|
||||
for encoding in settings.ENCODINGS:
|
||||
try:
|
||||
return text.encode(encoding)
|
||||
except (LookupError, UnicodeEncodeError):
|
||||
pass
|
||||
# no valid encoding found. Replace unconvertable parts with ?
|
||||
return text.encode(default_encoding, errors="replace")
|
||||
|
||||
return obj
|
||||
|
||||
def to_str(text, session=None):
|
||||
"""
|
||||
Try to decode a bytestream to a python str, using encoding schemas from settings
|
||||
or from Session. Will always return a str(), also if not given a str/bytes.
|
||||
|
||||
Args:
|
||||
text (any): The text to encode to bytes. If a str, return it. If also not bytes, convert
|
||||
to str using str() or repr() as a fallback.
|
||||
session (Session, optional): A Session to get encoding info from. Will try this before
|
||||
falling back to settings.ENCODINGS.
|
||||
|
||||
Returns:
|
||||
decoded_text (str): The decoded text.
|
||||
|
||||
Note:
|
||||
If `text` is already str, return it as is.
|
||||
"""
|
||||
if isinstance(text, str):
|
||||
return text
|
||||
if not isinstance(text, bytes):
|
||||
# not a byte, convert directly to str
|
||||
try:
|
||||
return str(text)
|
||||
except Exception:
|
||||
return repr(text)
|
||||
|
||||
default_encoding = session.protocol_flags.get("ENCODING", 'utf-8') if session else 'utf-8'
|
||||
try:
|
||||
return text.decode(default_encoding)
|
||||
except (LookupError, UnicodeDecodeError):
|
||||
for encoding in settings.ENCODINGS:
|
||||
try:
|
||||
return text.decode(encoding)
|
||||
except (LookupError, UnicodeDecodeError):
|
||||
pass
|
||||
# no valid encoding found. Replace unconvertable parts with ?
|
||||
return text.decode(default_encoding, errors="replace")
|
||||
|
||||
|
||||
def validate_email_address(emailaddress):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue