Contrib/Evland was removed due to security issues and unsupported code. The code can be found for reference at https://github.com/Griatch/evlang.

This commit is contained in:
Griatch 2014-04-15 18:04:43 +02:00
parent ef0a154a61
commit b7b68afe20
6 changed files with 0 additions and 1372 deletions

View file

@ -1,114 +0,0 @@
EVLANG
EXPERIMENTAL IMPLEMENTATION
Evennia contribution - Griatch 2012
"Evlang" is a heavily restricted version of Python intended to be used
by regular players to code simple functionality on supporting objects.
It's referred to as "evlang" or "evlang scripts" in order to
differentiate from Evennia's normal (and unrelated) "Scripts".
WARNING:
Restricted python execution is a tricky art, and this module -is-
partly based on blacklisting techniques, which might be vulnerable to
new venues of attack opening up in the future (or existing ones we've
missed). Whereas I/we know of no obvious exploits to this, it is no
guarantee. If you are paranoid about security, consider also using
secondary defences on the OS level such as a jail and highly
restricted execution abilities for the twisted process. So in short,
this should work fine, but use it at your own risk. You have been
warned.
An Evennia server with Evlang will, once set up, minimally consist of
the following components:
- The evlang parser (bottom of evlang.py). This combines
regular removal of dangerous modules/builtins with AST-traversal.
it implements a limited_exec() function.
- The Evlang handler (top of evlang.py). This handler is the Evennia
entry point. It should be added to objects that should support
evlang-scripting.
- A custom object typeclass. This must set up the Evlang handler
and store a few critical Attributes on itself for book-keeping.
The object will probably also overload some of its hooks to
call the correct evlang script at the proper time
- Command(s) for adding code to supporting objects
- Optional expanded "safe" methods/objects to include in the
execution environment. These are defined in settings (see
header of evlang.py for more info).
You can set this up easily to try things out by using the included
examples:
Quick Example Install
---------------------
This is a quick test-setup using the example objects and commands.
1) If you haven't already, make sure you are able to overload the
default cmdset: Copy game/gamesrc/commands/examples/cmdset.py up
one level, then change settings.CMDSET_DEFAULT to point to
DefaultCmdSet in your newly copied module. Restart the server and
check so the default commands still work.
2) Import and add
contrib.evlang.command.CmdCode
and
contrib.evlang.examples.CmdCraftScriptable
to your default command set. Reload server.
That's it, really. You should now have two new commands available,
@craftscriptable and @code. The first one is a simple "crafting-like"
command that will create an object of type
contrib.evlang.examples.CraftedScriptableObject while setting it up
with some basic scripting slots.
Try it now:
@craftscriptable crate
You create a simple "crate" object in your current location. You can
use @code to see which "code types" it will accept.
@code crate
You should see a list with "drop", "get" and "look", each without
anything assigned to them. If you look at how CraftedScriptableObject
is defined you will find that these "command types" (you can think of
them as slots where custom code can be put) are tied to the at_get,
at_drop and at_desc hooks respecively - this means Evlang scripts put
in the respective slots will ttrigger at the appropriate time.
There are a few "safe" objects made available out of the box.
self - reference to object the Evlang handler is defined on
here - shortcut for self.location
caller - reference back to the one triggering the script
scripter - reference to the one creating the script (set by @code)
There is also the 'evl' object that defines "safe" methods to use:
evl.msg(string, obj=None) # default is the send to caller
evl.msg_contents(string, obj=None) # default is to send to all except caller
evl.msg_home(string, obj=None) # default is to send to self.location
delay(delay, function, *args, **kwargs)
attr(obj, attrname=None, attrvalue=None, delete=False) # lock-checking attribute accesser
list() # display all available methods on evl, with docstrings (including your custom additions)
These all return True after successful execution, which makes
especially the msg* functions easier to use in a conditional. Let's
try it.
@code crate/look = caller.key=='Superman' and evl.msg("Your gaze burns a small hole.") or evl.msg("Looks robust!")
Now look at the crate. :)
You can (in evlang) use evl.list() to get a list of all methods
currently stored on the evl object. For testing, let's use the same
look slot on the crate again. But this time we'll use the /debug mode
of @code, which means the script will be auto-run immediately and we
don't have to look at the create to get a result when developing.
@code/debug crate/look = evl.msg(evl.list())

View file

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View file

@ -1,132 +0,0 @@
"""
Evlang usage examples
Commands for use with evlang
Evennia contribution - Griatch 2012
The @code command allows to add scripted evlang code to
a ScriptableObject. It will handle access checks.
"""
from ev import utils
from ev import default_cmds
from src.utils import prettytable
#------------------------------------------------------------
# Evlang-related commands
#
# Easiest is to add this command to the default cmdset.
# Alternatively one could imagine storing it directly only
# on scriptable objects.
#------------------------------------------------------------
class CmdCode(default_cmds.MuxCommand):
"""
add custom code to a scriptable object
Usage:
@code[/switch] <obj>[/<type> [= <codestring> ]]
Switch:
delete - clear code of given type from the object.
debug - immediately run the given code after adding it.
This will add custom scripting to an object
which allows such modification.
<type> must be one of the script types allowed
on the object. Only supplying the command will
return a list of script types possible to add
custom scripts to.
"""
key = "@code"
locks = "cmd:perm(Builders)"
help_category = "Building"
def func(self):
"implements the functionality."
caller = self.caller
if not self.args:
caller.msg("Usage: @code <obj>[/<type> [= <codestring>]]")
return
codetype = None
objname = self.lhs
if '/' in self.lhs:
objname, codetype = [part.strip() for part in self.lhs.rsplit("/", 1)]
obj = self.caller.search(objname)
if not obj:
return
# get the dicts from db storage for easy referencing
evlang_scripts = obj.db.evlang_scripts
evlang_locks = obj.db.evlang_locks
if not (evlang_scripts != None and evlang_locks and obj.ndb.evlang):
caller.msg("Object %s can not be scripted." % obj.key)
return
if 'delete' in self.switches:
# clearing a code snippet
if not codetype:
caller.msg("You must specify a code type.")
return
if not codetype in evlang_scripts:
caller.msg("Code type '%s' not found on %s." % (codetype, obj.key))
return
# this will also update the database
obj.ndb.evlang.delete(codetype)
caller.msg("Code for type '%s' cleared on %s." % (codetype, obj.key))
return
if not self.rhs:
if codetype:
scripts = [(name, tup[1], utils.crop(tup[0]))
for name, tup in evlang_scripts.items() if name==codetype]
scripts.extend([(name, "--", "--") for name in evlang_locks
if name not in evlang_scripts if name==codetype])
else:
# no type specified. List all scripts/slots on object
print evlang_scripts
scripts = [(name, tup[1], utils.crop(tup[0]))
for name, tup in evlang_scripts.items()]
scripts.extend([(name, "--", "--") for name in evlang_locks
if name not in evlang_scripts])
scripts = sorted(scripts, key=lambda p: p[0])
table = prettytable.PrettyTable(["{wtype", "{wcreator", "{wcode"])
for tup in scripts:
table.add_row([tup[0], tup[1], tup[2]])
string = "{wEvlang scripts on %s:{n\n%s" % (obj.key, table)
caller.msg(string)
return
# we have rhs
codestring = self.rhs
if not codetype in evlang_locks:
caller.msg("Code type '%s' cannot be coded on %s." % (codetype, obj.key))
return
# check access with the locktype "code"
if not obj.ndb.evlang.lockhandler.check(caller, "code"):
caller.msg("You are not permitted to add code of type %s." % codetype)
return
# we have code access to this type.
oldcode = None
if codetype in evlang_scripts:
oldcode = str(evlang_scripts[codetype][0])
# this updates the database right away too
obj.ndb.evlang.add(codetype, codestring, scripter=caller)
if oldcode:
caller.msg("{wReplaced{n\n %s\n{wWith{n\n %s" % (oldcode, codestring))
else:
caller.msg("Code added in '%s':\n %s" % (codetype, codestring))
if "debug" in self.switches:
# debug mode
caller.msg("{wDebug: running script (look out for errors below) ...{n\n" + "-"*68)
obj.ndb.evlang.run_by_name(codetype, caller, quiet=False)

View file

@ -1,955 +0,0 @@
"""
EVLANG
A mini-language for online coding of Evennia
Evennia contribution - Griatch 2012
WARNING:
Restricted python execution is a tricky art, and this module -is-
partly based on blacklisting techniques, which might be vulnerable to
new venues of attack opening up in the future (or existing ones we've
missed). Whereas I/we know of no obvious exploits to this, it is no
guarantee. If you are paranoid about security, consider also using
secondary defences on the OS level such as a jail and highly
restricted execution abilities for the twisted process. So in short,
this should work fine, but use it at your own risk. You have been
warned.
This module offers a highly restricted execution environment for users
to script objects in an almost-Python language. It's not really a true
sandbox but based on a very stunted version of Python. This not only
restricts obvious things like import statements and other builins, but
also pre-parses the AST tree to completely kill whole families of
functionality. The result is a subset of Python that -should- keep an
untrusted, malicious user from doing bad things to the server.
An important limitation with this this implementation is a lack of a
timeout check - inside Twisted (and in Python in general) it's very
hard to safely kill a thread with arbitrary code once it's running. So
instead we restrict the most common DOS-attack vectors, such as while
loops, huge power-law assignments as well as function definitions. A
better way would probably be to spawn the runner into a separate
process but that stunts much of the work a user might want to do with
objects (since the current in-memory state of an object has potential
importance in Evennia). If you want to try the subprocess route, you
might want to look into hacking the Evlang handler (below) onto code
from the pysandbox project (https://github.com/haypo/pysandbox). Note
however, that one would probably need to rewrite that to use Twisted's
non-blocking subprocess mechanisms instead.
The module holds the "Evlang" handler, which is intended to be the
entry point for adding scripting support anywhere in Evennia.
By default the execution environment makes the following objects
available (some or all of these may be None depending on how the
code was launched):
caller - a reference to the object triggering the code
scripter - the original creator of the code
self - the object on which the code is defined
here - shortcut to self.location, if applicable
There is finally a variable "evl" which is a holder object for safe
functions to execute. This object is initiated with the objects above,
to make sure the user does not try to forge the input arguments. See
below the default safe methods defined on it.
You can add new safe symbols to the execution context by adding
EVLANG_SAFE_CONTEXT to your settings file. This should be a dictionary
with {"name":object} pairs.
You can also add new safe methods to the evl object. You add them as a
dictionary on the same form to settings.EVLANG_SAFE_METHODS. Remember
that such meethods must be defined properly to be a class method
(notably "self" must be be the first argument on the argument list).
You can finally define settings.EVLANG_UNALLOWED_SYMBOLS as a list of
python symbol names you specifically want to lock. This will lock both
functions of that name as well as trying to access attributes on
objects with that name (note that these "attributes" have nothing to
do with Evennia's in-database "Attribute" system!).
"""
import sys, os, time
import __builtin__
import inspect, ast, _ast
from twisted.internet import reactor, threads, task
from twisted.internet.defer import inlineCallbacks
# set up django, if necessary
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from game import settings
try:
from django.conf import settings as settings2
settings2.configure()
except RuntimeError:
pass
finally:
del settings2
_LOGGER = None
#------------------------------------------------------------
# Evennia-specific blocks
#------------------------------------------------------------
# specifically forbidden symbols
_EV_UNALLOWED_SYMBOLS = ["attr", "attributes", "delete"]
try:
_EV_UNALLOWED_SYMBOLS.expand(settings.EVLANG_UNALLOWED_SYMBOLS)
except AttributeError:
pass
# safe methods (including self in args) to make available on
# the evl object
_EV_SAFE_METHODS = {}
try:
_EV_SAFE_METHODS.update(settings.EVLANG_SAFE_METHODS)
except AttributeError:
pass
# symbols to make available directly in code
_EV_SAFE_CONTEXT = {"testvar": "This is a safe var!"}
try:
_EV_SAFE_CONTEXT.update(settings.EVLANG_SAFE_CONTEXT)
except AttributeError:
pass
#------------------------------------------------------------
# Holder object for "safe" function access
#------------------------------------------------------------
class Evl(object):
"""
This is a wrapper object for storing safe functions
in a secure way, while offering a few properties for
them to access. This will be made available as the
"evl" property in code.
"""
def __init__(self, obj=None, caller=None, scripter=None, **kwargs):
"Populate the object with safe properties"
self.obj = obj
self.caller = caller
self.scripter = scripter
self.locatiton = None
if obj and hasattr(obj, "location"):
self.location = obj.location
for key, val in _EV_SAFE_METHODS.items():
setattr(self.__class__, name, val)
for key, val in kwargs.items():
setattr(self.__class__, name, val)
def list(self):
"""
list()
returns a string listing all methods on the evl object, including doc strings."
"""
# must do it this way since __dict__ is restricted
members = [mtup for mtup in inspect.getmembers(Evl, predicate=inspect.ismethod)
if not mtup[0].startswith("_")]
string = "\n".join(["{w%s{n\n %s" % (mtup[0], mtup[1].func_doc.strip())
for mtup in members])
return string
def msg(self, string, obj=None):
"""
msg(string, obj=None)
Sends message to obj or to caller if obj is not defined..
"""
if not obj:
obj = self.caller
obj.msg(string)
return True
def msg_contents(self, string, obj=None):
"""
msg_contents(string, obj=None):
Sends message to the contents of obj, or to content of self if obj is not defined.
"""
if not obj:
obj = self.obj
obj.msg_contents(string, exclude=[obj])
return True
def msg_here(self, string, obj=None):
"""
msg_here(string, obj=None)
Sends to contents of obj.location, or to self.location if obj is not defined.
"""
if obj and hasattr(obj, "location"):
here = obj.location
else:
here = self.location
if here:
here.msg_contents(string)
def delay(self, seconds, function, *args, **kwargs):
"""
delay(seconds, function, *args, **kwargs):
Delay execution of function(*args, **kwargs) for up to 120 seconds.
Error messages are relayed to caller unless a specific keyword
'errobj' is supplied pointing to another object to receiver errors.
"""
# handle the special error-reporting object
errobj = self.caller
if "errobj" in kwargs:
errobj = kwargs["errobj"]
del kwargs["errobj"]
# set up some callbacks for delayed execution
def errback(f, errobj):
"error callback"
if errobj:
try:
f = f.getErrorMessage()
except:
pass
errobj.msg("EVLANG delay error: " + str(f))
def runfunc(func, *args, **kwargs):
"threaded callback"
threads.deferToThread(func, *args, **kwargs).addErrback(errback, errobj)
# get things going
if seconds <= 120:
task.deferLater(reactor, seconds, runfunc, function, *args, **kwargs).addErrback(errback, errobj)
else:
raise EvlangError("delay() can only delay for a maximum of 120 seconds (got %ss)." % seconds)
return True
def attr(self, obj, attrname=None, value=None, delete=False):
"""
attr(obj, attrname=None, value=None, delete=False)
Access and edit database Attributes on obj. if only obj
is given, return list of Attributes on obj. If attrname
is given, return that Attribute's value only. If also
value is given, set the attribute to that value. The
delete flag will delete the given attrname from the object.
Access is checked for all operations. The method will return
the attribute value or True if the operation was a success,
None otherwise.
"""
print obj, hasattr(obj, "secure_attr")
if hasattr(obj, "secure_attr"):
return obj.secure_attr(self.caller, attrname, value, delete=False,
default_access_read=True, default_access_edit=False,
default_access_create=True)
return False
#------------------------------------------------------------
# Evlang class handler
#------------------------------------------------------------
class EvlangError(Exception):
"Error for evlang handler"
pass
class Evlang(object):
"""
This is a handler for launching limited execution Python scripts.
Normally this handler is stored on an object and will then give
access to basic operations on the object. It can however also be
run stand-alone.
If running on an object, it should normally be initiated in the
object's at_server_start() method and assigned to a property
"evlang" (or similar) for easy access. It will then use the object
for storing a dictionary of available evlang scripts (default name
of this attribute is "evlang_scripts").
Note: This handler knows nothing about access control. To get that
one needs to append a LockHandler as "lockhandler" at creation
time, as well as arrange for commands to do access checks of
suitable type. Once methods on this handler are called, access is
assumed to be granted.
"""
def __init__(self, obj=None, scripts=None, storage_attr="evlang_scripts",
safe_context=None, safe_timeout=2):
"""
Setup of the Evlang handler.
Input:
obj - a reference to the object this handler is defined on. If not
set, handler will operate stand-alone.
scripts = dictionary {scriptname, (codestring, callerobj), ...}
where callerobj can be Noneevlang_storage_attr - if obj
is given, will look for a dictionary
{scriptname, (codestring, callerobj)...}
stored in this given attribute name on that object.
safe_funcs - dictionary of {funcname:funcobj, ...} to make available
for the execution environment
safe_timeout - the time we let a script run. If it exceeds this
time, it will be blocked from running again.
"""
self.obj = obj
self.evlang_scripts = {}
self.safe_timeout = safe_timeout
self.evlang_storage_attr = storage_attr
if scripts:
self.evlang_scripts.update(scripts)
if self.obj:
self.evlang_scripts.update(obj.attributes.get(storage_attr))
self.safe_context = _EV_SAFE_CONTEXT # set by default + settings
if safe_context:
self.safe_context.update(safe_context)
self.timedout_codestrings = []
def msg(self, string, scripter=None, caller=None):
"""
Try to send string to a receiver. Returns False
if no receiver was found.
"""
if scripter:
scripter.msg(string)
elif caller:
caller.msg(string)
elif self.obj:
self.obj.msg(string)
else:
return False
return True
def start_timer(self, timeout, codestring, caller, scripter):
"""
Start a timer to check how long an execution has lasted.
Returns a deferred, which should be cancelled when the
code does finish.
"""
def alarm(codestring):
"store the code of too-long-running scripts"
global _LOGGER
if not _LOGGER:
from src.utils import logger as _LOGGER
self.timedout_codestrings.append(codestring)
err = "Evlang code '%s' exceeded allowed execution time (>%ss)." % (codestring, timeout)
_LOGGER.log_errmsg("EVLANG time exceeded: caller: %s, scripter: %s, code: %s" % (caller, scripter, codestring))
if not self.msg(err, scripter, caller):
raise EvlangError(err)
def errback(f):
"We need an empty errback, to catch the traceback of defer.cancel()"
pass
return task.deferLater(reactor, timeout, alarm, codestring).addErrback(errback)
def stop_timer(self, _, deferred):
"""Callback for stopping a previously started timer.
Cancels the given deferred.
"""
deferred.cancel()
@inlineCallbacks
def run(self, codestring, caller=None, scripter=None):
"""
run a given code string.
codestring - the actual code to execute.
scripter - the creator of the script. Preferentially sees error messages
caller - the object triggering the script - sees error messages if
no scripter is given
"""
# catching previously detected long-running code
if codestring in self.timedout_codestrings:
err = "Code '%s' previously failed with a timeout. Please rewrite code." % codestring
if not self.msg(err, scripter, caller):
raise EvlangError(err)
return
# dynamically setup context, then overload with custom additions
location = None
if self.obj:
location = self.obj.location
context = {"self":self.obj,
"caller":caller,
"scripter": scripter,
"here": location,
"evl": Evl(self.obj, caller, scripter)}
context.update(self.safe_context)
# launch the runner in a separate thread, tracking how long it runs.
timer = self.start_timer(self.safe_timeout, codestring, scripter, caller)
try:
yield threads.deferToThread(limited_exec, codestring, context=context,
timeout_secs=self.safe_timeout).addCallback(self.stop_timer, timer)
except Exception, e:
self.stop_timer(None, timer)
if not self.msg(e, scripter, caller):
raise e
def run_by_name(self, scriptname, caller=None, quiet=True):
"""
Run a script previously stored on the handler, identified by scriptname.
scriptname - identifier of the stored script
caller - optional reference to the object triggering the script.
quiet - will not raise error if scriptname is not found.
All scripts run will have access to the self, caller and here variables.
"""
scripter = None
try:
codestring, scripter = self.evlang_scripts[scriptname]
except KeyError:
if quiet:
return
errmsg = "Found no script with the name '%s'." % scriptname
if not self.msg(errmsg, scripter=None, caller=caller):
raise EvlangError(errmsg)
return
# execute code
self.run(codestring, caller, scripter)
def add(self, scriptname, codestring, scripter=None):
"""
Add a new script to the handler. This will also save the
script properly. This is used also to update scripts when
debugging.
"""
self.evlang_scripts[scriptname] = (codestring, scripter)
if self.obj:
# save to database
self.obj.attributes.add(self.evlang_storage_attr,
self.evlang_scripts)
def delete(self, scriptname):
"""
Permanently remove script from object.
"""
if scriptname in self.evlang_scripts:
del self.evlang_scripts[scriptname]
if self.obj:
# update change to database
self.obj.attributes.add(self.evlang_storage_attr,
self.evlang_scripts)
#----------------------------------------------------------------------
# Limited Python evaluation.
# Based on PD recipe by Babar K. Zafar
# http://code.activestate.com/recipes/496746/
# Expanded specifically for Evennia by Griatch
# - some renaming/cleanup
# - limited size of power expressions
# - removed print (use msg() instead)
# - blocking certain function calls
# - removed assignment of properties - this is too big of a security risk.
# One needs to us a safe function to change propertes.
# - removed thread-based check for execution time - it doesn't work
# embedded in twisted/python.
# - removed while, since it's night impossible to properly check compile
# time in an embedded Python thread (or rather, it's possible, but
# there is no way to cancel the thread anyway). while is an easy way
# to create an infinite loop.
#----------------------------------------------------------------------
#----------------------------------------------------------------------
# Module globals.
#----------------------------------------------------------------------
# Toggle module level debugging mode.
DEBUG = False
# List of all AST node classes in _ast.py.
ALL_AST_NODES = \
set([name for (name, obj) in inspect.getmembers(_ast)
if inspect.isclass(obj) and issubclass(obj, _ast.AST)])
# List of all builtin functions and types (ignoring exception classes).
ALL_BUILTINS = \
set([name for (name, obj) in inspect.getmembers(__builtin__)
if (inspect.isbuiltin(obj) or name in ('True', 'False', 'None') or
(inspect.isclass(obj) and not issubclass(obj, BaseException)))])
#----------------------------------------------------------------------
# Utilties.
#----------------------------------------------------------------------
def classname(obj):
return obj.__class__.__name__
def is_valid_ast_node(name):
return name in ALL_AST_NODES
def is_valid_builtin(name):
return name in ALL_BUILTINS
def get_node_lineno(node):
return (node.lineno) and node.lineno or 0
#----------------------------------------------------------------------
# Restricted AST nodes & builtins.
#----------------------------------------------------------------------
# Deny evaluation of code if the AST contain any of the following nodes:
UNALLOWED_AST_NODES = set([
# 'Add', 'And',
# 'AssList',
# 'AssName',
# 'AssTuple',
# 'Assert', 'Assign', 'AugAssign',
# 'Bitand', 'Bitor', 'Bitxor', 'Break',
# 'CallFunc', 'Class', 'Compare', 'Const', 'Continue',
# 'Decorators', 'Dict', 'Discard', 'Div',
# 'Ellipsis', 'EmptyNode',
'Exec',
# 'Expression', 'FloorDiv',
# 'For',
'FunctionDef',
# 'GenExpr', 'GenExprFor', 'GenExprIf', 'GenExprInner',
# 'Getattr', 'Global', 'If',
'Import',
'ImportFrom',
# 'Invert',
# 'Keyword', 'Lambda', 'LeftShift',
# 'List', 'ListComp', 'ListCompFor', 'ListCompIf', 'Mod',
# 'Module',
# 'Mul', 'Name', 'Node', 'Not', 'Or', 'Pass', 'Power',
'Print',
'Raise',
# 'Return', 'RightShift', 'Slice', 'Sliceobj',
# 'Stmt', 'Sub', 'Subscript',
'TryExcept', 'TryFinally',
# 'Tuple', 'UnaryAdd', 'UnarySub',
'While',
# 'Yield'
])
# Deny evaluation of code if it tries to access any of the following builtins:
UNALLOWED_BUILTINS = set([
'__import__',
# 'abs', 'apply', 'basestring', 'bool', 'buffer',
# 'callable', 'chr', 'classmethod', 'cmp', 'coerce',
'compile',
# 'complex',
'delattr',
# 'dict',
'dir',
# 'divmod', 'enumerate',
'eval', 'execfile', 'file',
# 'filter', 'float', 'frozenset',
'getattr', 'globals', 'hasattr',
# 'hash', 'hex', 'id',
'input',
# 'int',
'intern',
# 'isinstance', 'issubclass', 'iter',
# 'len', 'list',
'locals',
# 'long', 'map', 'max',
'memoryview',
# 'min', 'object', 'oct',
'open',
# 'ord', 'pow', 'property', 'range',
'raw_input',
# 'reduce',
'reload',
# 'repr', 'reversed', 'round', 'set',
'setattr',
# 'slice', 'sorted', 'staticmethod', 'str', 'sum',
'super',
# 'tuple',
'type',
# 'unichr', 'unicode',
'vars',
# 'xrange', 'zip'
])
# extra validation whitelist-style to avoid new versions of Python creeping
# in with new unsafe things
SAFE_BUILTINS = set([
'False', 'None', 'True', 'abs', 'all', 'any', 'apply', 'basestring',
'bin', 'bool', 'buffer', 'bytearray', 'bytes', 'callable', 'chr',
'classmethod',
'cmp', 'coerce', 'complex', 'dict', 'divmod', 'enumerate', 'filter',
'float', 'format', 'frozenset', 'hash', 'hex', 'id', 'int',
'isinstance', 'issubclass', 'iter', 'len', 'list', 'long', 'map',
'max', 'min',
'next', 'object', 'oct', 'ord', 'pow', 'print', 'property', 'range',
'reduce',
'repr', 'reversed', 'round', 'set', 'slice', 'sorted', 'staticmethod',
'str',
'sum', 'tuple', 'unichr', 'unicode', 'xrange', 'zip'])
for ast_name in UNALLOWED_AST_NODES:
assert(is_valid_ast_node(ast_name))
for name in UNALLOWED_BUILTINS:
assert(is_valid_builtin(name))
def _cross_match_whitelist():
"check the whitelist's completeness"
available = ALL_BUILTINS - UNALLOWED_BUILTINS
diff = available.difference(SAFE_BUILTINS)
assert not diff, diff # check so everything not disallowed is in safe
diff = SAFE_BUILTINS.difference(available)
assert not diff, diff # check so everything in safe is in not-disallowed
_cross_match_whitelist()
def is_unallowed_ast_node(kind):
return kind in UNALLOWED_AST_NODES
def is_unallowed_builtin(name):
return name in UNALLOWED_BUILTINS
#----------------------------------------------------------------------
# Restricted attributes.
#----------------------------------------------------------------------
# In addition to these we deny access to all lowlevel attrs (__xxx__).
UNALLOWED_ATTR = [
'im_class', 'im_func', 'im_self',
'func_code', 'func_defaults', 'func_globals', 'func_name',
'tb_frame', 'tb_next',
'f_back', 'f_builtins', 'f_code', 'f_exc_traceback',
'f_exc_type', 'f_exc_value', 'f_globals', 'f_locals']
UNALLOWED_ATTR.extend(_EV_UNALLOWED_SYMBOLS)
def is_unallowed_attr(name):
return (name[:2] == '__' and name[-2:] == '__') or \
(name in UNALLOWED_ATTR)
#----------------------------------------------------------------------
# LimitedExecVisitor.
#----------------------------------------------------------------------
class LimitedExecError(object):
"""
Base class for all which occur while walking the AST.
Attributes:
errmsg = short decription about the nature of the error
lineno = line offset to where error occured in source code
"""
def __init__(self, errmsg, lineno):
self.errmsg, self.lineno = errmsg, lineno
def __str__(self):
return "line %d : %s" % (self.lineno, self.errmsg)
class LimitedExecASTNodeError(LimitedExecError):
"Expression/statement in AST evaluates to a restricted AST node type."
pass
class LimitedExecBuiltinError(LimitedExecError):
"Expression/statement in tried to access a restricted builtin."
pass
class LimitedExecAttrError(LimitedExecError):
"Expression/statement in tried to access a restricted attribute."
pass
class LimitedExecVisitor(object):
"""
Data-driven visitor which walks the AST for some code and makes
sure it doesn't contain any expression/statements which are
declared as restricted in 'UNALLOWED_AST_NODES'. We'll also make
sure that there aren't any attempts to access/lookup restricted
builtin declared in 'UNALLOWED_BUILTINS'. By default we also won't
allow access to lowlevel stuff which can be used to dynamically
access non-local envrioments.
Interface:
walk(ast) = validate AST and return True if AST is 'safe'
Attributes:
errors = list of LimitedExecError if walk() returned False
Implementation:
The visitor will automatically generate methods for all of the
available AST node types and redirect them to self.ok or self.fail
reflecting the configuration in 'UNALLOWED_AST_NODES'. While
walking the AST we simply forward the validating step to each of
node callbacks which take care of reporting errors.
"""
def __init__(self):
"Initialize visitor by generating callbacks for all AST node types."
self.errors = []
for ast_name in ALL_AST_NODES:
# Don't reset any overridden callbacks.
if getattr(self, 'visit' + ast_name, None):
continue
if is_unallowed_ast_node(ast_name):
setattr(self, 'visit' + ast_name, self.fail)
else:
setattr(self, 'visit' + ast_name, self.ok)
def walk(self, astnode):
"Validate each node in AST and return True if AST is 'safe'."
self.visit(ast)
return self.errors == []
def visit(self, node, *args):
"Recursively validate node and all of its children."
fn = getattr(self, 'visit' + classname(node))
if DEBUG:
self.trace(node)
fn(node, *args)
for child in node.getChildNodes():
self.visit(child, *args)
def visitName(self, node, *args):
"Disallow any attempts to access a restricted builtin/attr."
name = node.getChildren()[0]
lineno = get_node_lineno(node)
if is_unallowed_builtin(name):
self.errors.append(LimitedExecBuiltinError(
"access to builtin '%s' is denied" % name, lineno))
elif is_unallowed_attr(name):
self.errors.append(LimitedExecAttrError(
"access to attribute '%s' is denied" % name, lineno))
def visitGetattr(self, node, *args):
"Disallow any attempts to access a restricted attribute."
attrname = node.attrname
try:
name = node.getChildren()[0].name
except Exception:
name = ""
lineno = get_node_lineno(node)
if attrname == 'attr' and name == 'evl':
pass
elif is_unallowed_attr(attrname):
self.errors.append(LimitedExecAttrError(
"access to attribute '%s' is denied" % attrname, lineno))
def visitAssName(self, node, *args):
"Disallow attempts to delete an attribute or name"
if node.flags == 'OP_DELETE':
self.fail(node, *args)
def visitPower(self, node, *args):
"Make sure power-of operations don't get too big"
if node.left.value > 1000000 or node.right.value > 10:
lineno = get_node_lineno(node)
self.errors.append(LimitedExecAttrError(
"power law solution too big - restricted", lineno))
def ok(self, node, *args):
"Default callback for 'harmless' AST nodes."
pass
def fail(self, node, *args):
"Default callback for unallowed AST nodes."
lineno = get_node_lineno(node)
self.errors.append(LimitedExecASTNodeError(
"execution of '%s' statements is denied" % classname(node),
lineno))
def trace(self, node):
"Debugging utility for tracing the validation of AST nodes."
print classname(node)
for attr in dir(node):
if attr[:2] != '__':
print ' ' * 4, "%-15.15s" % attr, getattr(node, attr)
#----------------------------------------------------------------------
# Safe 'eval' replacement.
#----------------------------------------------------------------------
class LimitedExecException(Exception):
"Base class for all safe-eval related errors."
pass
class LimitedExecCodeException(LimitedExecException):
"""
Exception class for reporting all errors which occured while
validating AST for source code in limited_exec().
Attributes:
code = raw source code which failed to validate
errors = list of LimitedExecError
"""
def __init__(self, code, errors):
self.code, self.errors = code, errors
def __str__(self):
return '\n'.join([str(err) for err in self.errors])
class LimitedExecContextException(LimitedExecException):
"""
Exception class for reporting unallowed objects found in the dict
intended to be used as the local enviroment in safe_eval().
Attributes:
keys = list of keys of the unallowed objects
errors = list of strings describing the nature of the error
for each key in 'keys'
"""
def __init__(self, keys, errors):
self.keys, self.errors = keys, errors
def __str__(self):
return '\n'.join([str(err) for err in self.errors])
class LimitedExecTimeoutException(LimitedExecException):
"""
Exception class for reporting that code evaluation execeeded
the given timelimit.
Attributes:
timeout = time limit in seconds
"""
def __init__(self, timeout):
self.timeout = timeout
def __str__(self):
return "Timeout limit execeeded (%s secs) during exec" % self.timeout
def validate_context(context):
"Checks a supplied context for dangerous content"
ctx_errkeys, ctx_errors = [], []
for (key, obj) in context.items():
if inspect.isbuiltin(obj):
ctx_errkeys.append(key)
ctx_errors.append("key '%s' : unallowed builtin %s" % (key, obj))
if inspect.ismodule(obj):
ctx_errkeys.append(key)
ctx_errors.append("key '%s' : unallowed module %s" % (key, obj))
if ctx_errors:
raise LimitedExecContextException(ctx_errkeys, ctx_errors)
return True
def validate_code(codestring):
"validate a code string"
# prepare the code tree for checking
astnode = ast.parse(codestring)
checker = LimitedExecVisitor()
# check code tree, then execute in a time-restricted environment
if not checker.walk(astnode):
raise LimitedExecCodeException(codestring, checker.errors)
return True
def limited_exec(code, context = {}, timeout_secs=2, retobj=None, procpool_async=None):
"""
Validate source code and make sure it contains no unauthorized
expression/statements as configured via 'UNALLOWED_AST_NODES' and
'UNALLOWED_BUILTINS'. By default this means that code is not
allowed import modules or access dangerous builtins like 'open' or
'eval'.
code - code to execute. Will be evaluated for safety
context - if code is deemed safe, code will execute with this environment
time_out_secs - only used if procpool_async is given. Sets timeout
for remote code execution
retobj - only used if procpool_async is also given. Defines an Object
(which must define a msg() method), for receiving returns from
the execution.
procpool_async - a run_async function alternative to the one in
src.utils.utils. This must accept the keywords
proc_timeout (will be set to timeout_secs
at_return - a callback
at_err - an errback
If retobj is given, at_return/at_err will be created and
set to msg callbacks and errors to that object.
Tracebacks:
LimitedExecContextException
LimitedExecCodeException
"""
if validate_context(context) and validate_code(code):
# run code only after validation has completed
if procpool_async:
# custom run_async
if retobj:
callback = lambda r: retobj.msg(r)
errback = lambda e: retobj.msg(e)
procpool_async(code, *context,
proc_timeout=timeout_secs,
at_return=callback,
at_err=errback)
else:
procpool_async(code, *context, proc_timeout=timeout_secs)
else:
# run in-process
exec code in context
#----------------------------------------------------------------------
# Basic tests.
#----------------------------------------------------------------------
import unittest
class TestLimitedExec(unittest.TestCase):
def test_builtin(self):
# attempt to access a unsafe builtin
self.assertRaises(LimitedExecException,
limited_exec, "open('test.txt', 'w')")
def test_getattr(self):
# attempt to get arround direct attr access
self.assertRaises(LimitedExecException,
limited_exec, "getattr(int, '__abs__')")
def test_func_globals(self):
# attempt to access global enviroment where fun was defined
self.assertRaises(LimitedExecException,
limited_exec, "def x(): pass; print x.func_globals")
def test_lowlevel(self):
# lowlevel tricks to access 'object'
self.assertRaises(LimitedExecException,
limited_exec, "().__class__.mro()[1].__subclasses__()")
def test_timeout_ok(self):
# attempt to exectute 'slow' code which finishes within timelimit
def test(): time.sleep(2)
env = {'test': test}
limited_exec("test()", env, timeout_secs=5)
def test_timeout_exceed(self):
# attempt to exectute code which never teminates
self.assertRaises(LimitedExecException,
limited_exec, "while 1: pass")
def test_invalid_context(self):
# can't pass an enviroment with modules or builtins
env = {'f': __builtins__.open, 'g': time}
self.assertRaises(LimitedExecException,
limited_exec, "print 1", env)
def test_callback(self):
# modify local variable via callback
self.value = 0
def test(): self.value = 1
env = {'test': test}
limited_exec("test()", env)
self.assertEqual(self.value, 1)
if __name__ == "__main__":
unittest.main()

View file

@ -1,73 +0,0 @@
"""
Evlang - usage examples
Craftable object with matching command
Evennia contribution - Griatch 2012
"""
from ev import create_object
from ev import default_cmds
from contrib.evlang.objects import ScriptableObject
#------------------------------------------------------------
# Example for creating a scriptable object with a custom
# "crafting" command that sets coding restrictions on the
# object.
#------------------------------------------------------------
class CmdCraftScriptable(default_cmds.MuxCommand):
"""
craft a scriptable object
Usage:
@craftscriptable <name>
"""
key = "@craftscriptable"
locks = "cmd:perm(Builder)"
help_category = "Building"
def func(self):
"Implements the command"
caller = self.caller
if not self.args:
caller.msg("Usage: @craftscriptable <name>")
return
objname = self.args.strip()
obj = create_object(CraftedScriptableObject, key=objname, location=caller.location)
if not obj:
caller.msg("There was an error creating %s!" % objname)
return
# set locks on the object restrictive coding only to us, the creator.
obj.db.evlang_locks = {"get":"code:id(%s) or perm(Wizards)" % caller.dbref,
"drop":"code:id(%s) or perm(Wizards)" % caller.dbref,
"look": "code:id(%s) or perm(Wizards)" % caller.dbref}
caller.msg("Crafted %s. Use @desc and @code to customize it." % objname)
class CraftedScriptableObject(ScriptableObject):
"""
An object which allows customization of what happens when it is
dropped, taken or examined. It is meant to be created with the
special command CmdCraftScriptable above, for example as part of
an in-game "crafting" operation. It can henceforth be expanded
with custom scripting with the @code command (and only the crafter
(and Wizards) will be able to do so).
Allowed Evlang scripts:
"get"
"drop"
"look"
"""
def at_get(self, getter):
"called when object is picked up"
self.ndb.evlang.run_by_name("get", getter)
def at_drop(self, dropper):
"called when object is dropped"
self.ndb.evlang.run_by_name("drop", dropper)
def at_desc(self, looker):
"called when object is being looked at."
self.ndb.evlang.run_by_name("look", looker)

View file

@ -1,97 +0,0 @@
"""
Evlang usage examples
scriptable Evennia base typeclass and @code command
Evennia contribution - Griatch 2012
The ScriptableObject typeclass initiates the Evlang handler on
itself as well as sets up a range of commands to
allow for scripting its functionality. It sets up an access
control system using the 'code' locktype to limit access to
these codes.
The @code command allows to add scripted evlang code to
a ScriptableObject. It will handle access checks.
There are also a few examples of usage - a simple Room
object that has scriptable behaviour when it is being entered
as well as a more generic template for a Craftable object along
with a base crafting command to create it and set it up with
access restrictions making it only scriptable by the original
creator.
"""
from contrib.evlang import evlang
from src.locks.lockhandler import LockHandler
from ev import Object
#------------------------------------------------------------
# Typeclass bases
#------------------------------------------------------------
class ScriptableObject(Object):
"""
Base class for an object possible to script. By default it defines
no scriptable types.
"""
def init_evlang(self):
"""
Initialize an Evlang handler with access control. Requires
the evlang_locks attribute to be set to a dictionary with
{name:lockstring, ...}.
"""
evl = evlang.Evlang(self)
evl.lock_storage = ""
evl.lockhandler = LockHandler(evl)
for lockstring in self.db.evlang_locks.values():
evl.lockhandler.add(lockstring)
return evl
def at_object_creation(self):
"""
We add the Evlang handler and sets up
the needed properties.
"""
# this defines the available types along with the lockstring
# restricting access to them. Anything not defined in this
# dictionary is forbidden to script at all. Just because
# a script type is -available- does not mean there is any
# code yet in that slot!
self.db.evlang_locks = {}
# This stores actual code snippets. Only code with codetypes
# matching the keys in db.evlang_locks will work.
self.db.evlang_scripts = {}
# store Evlang handler non-persistently
self.ndb.evlang = self.init_evlang()
def at_init(self):
"We must also re-add the handler at server reboots"
self.ndb.evlang = self.init_evlang()
# Example object types
from ev import Room
class ScriptableRoom(Room, ScriptableObject):
"""
A room that is scriptable as well as allows users
to script what happens when users enter it.
Allowed scripts:
"enter" (allowed to be modified by all builders)
"""
def at_object_creation(self):
"initialize the scriptable object"
self.db.evlang_locks = {"enter": "code:perm(Builders)"}
self.db.evlang_scripts = {}
self.ndb.evlang = self.init_evlang()
def at_object_receive(self, obj, source_location):
"fires a script of type 'enter' (no error if it's not defined)"
self.ndb.evlang.run_by_name("enter", obj)