Source code for codemach

__version__ = '0.3'

import sys
import dis
import types
import operator
import builtins
import logging
import logging.config

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

class Signal(object):
    def __init__(self):
        self.__watch = None

    def emmit(self, *args):
        if self.__watch is not None:
            self.__watch(*args)

    def subscribe(self, callback):
        self.__watch = callback

    def unsubscribe(self):
        self.__watch = None

class CodeType(object):
    def __init__(self, code):
        self.code = code

class FunctionCall(object):
    pass

[docs]def function_wrapper(machine, f): """ Wrap a function so that when called, its code is executed by **machine**. :param machine: a :py:class:`Machine` object :param f: a python function object """ def wrapper(*args): logger.debug('wrapper {}'.format(f)) logger.debug('called with {}'.format(args)) return machine.exec(f.__code__, f.__globals__) return wrapper
class FunctionType(object): def __init__(self, machine, code, globals_, name): self.func_raw = types.FunctionType(code, globals_, name) self.function = function_wrapper( machine, self.func_raw) def get_code(self): """ return the code object to be used by Machine """ logger.debug('closures') logger.debug(self.function.__closure__) return self.function.__closure__[0].cell_contents.__code__ def function_wrapped(self, machine): """ return the function object to be passed to builtin.__build_class__ """ #return self.function return function_wrapper( machine, self.func_raw) def __repr__(self): return '<{} object, function={}>'.format(self.__class__.__module__+'.'+self.__class__.__name__, self.func_raw.__name__) class SignalThing(object): def __init__(self): self.__watch = {} def emmit(self, thing, *args): try: if thing in self.__watch: self.__watch[thing](thing, *args) except TypeError: pass def subscribe(self, thing, callback): self.__watch[thing] = callback def unsubscribe(self, thing): del self.__watch[thing]
[docs]class Machine(object): """ Class that executes python code objects. :param verbose: verbosity level """ def __init__(self, verbose=False): self.__stack = [] self.verbose = verbose self.signal = { 'IMPORT_NAME': Signal(), 'CALL_FUNCTION': SignalThing(), 'LOAD_ATTR': SignalThing(), } @staticmethod def cmp_op(i): def not_in(a, b): return not (a in b) return ( operator.lt, operator.le, operator.eq, operator.ne, operator.gt, operator.ge, operator.contains, not_in, operator.is_, operator.is_not, 'exception match', 'BAD')[i]
[docs] def exec(self, code, _globals=None, _locals=None): """ Execute a code object The inputs and behavior of this function should match those of eval_ and exec_. .. _eval: https://docs.python.org/3/library/functions.html?highlight=eval#eval .. _exec: https://docs.python.org/3/library/functions.html?highlight=exec#exec .. note:: Need to figure out how the internals of this function must change for ``eval`` or ``exec``. :param code: a python code object :param _globals: optional globals dictionary :param _locals: optional locals dictionary """ if _globals is None: _globals = globals() if _locals is None: self._locals = _globals else: self._locals = _locals self.__globals = _globals return self.exec_instructions(code)
[docs] def load_name(self, name): """ Implementation of the LOAD_NAME operation """ if name in self.__globals: return self.__globals[name] b = self.__globals['__builtins__'] if isinstance(b, dict): return b[name] else: return getattr(b, name)
[docs] def store_name(self, name, val): """ Implementation of the STORE_NAME operation """ logging.debug('{:20} {} -> {}'.format('STORE_NAME', val, name)) self._locals[name] = val
#self.__globals[name] = val
[docs] def pop(self, n): """ Pop the **n** topmost items from the stack and return them as a ``list``. """ poped = self.__stack[len(self.__stack) - n:] del self.__stack[len(self.__stack) - n:] return poped
[docs] def build_class(self, callable_, args): """ Implement ``builtins.__build_class__``. We must wrap all class member functions using :py:func:`function_wrapper`. This requires using a :py:class:`Machine` to execute the class source code and then recreating the class source code using an :py:class:`Assembler`. .. note: We might be able to bypass the call to ``builtins.__build_class__`` entirely and manually construct a class object. """ machine = Machine(self.verbose) l = dict() machine.exec(args[0].get_code(), self.__globals, l) # construct code for class source a = Assembler() for name, value in l.items(): a.load_const(value) a.store_name(name) a.load_const(None) a.return_value() #machine = Machine(self.verbose) machine = MachineClassSource(self.verbose) f = types.FunctionType(a.code(), self.__globals, args[1]) args = (f, *args[1:]) self.signal['CALL_FUNCTION'].emmit(callable_, *args) return callable_(*args)
[docs] def call_function(self, i): """ Implement the CALL_FUNCTION_ operation. .. _CALL_FUNCTION: https://docs.python.org/3/library/dis.html#opcode-CALL_FUNCTION """ callable_ = self.__stack[-1-i.arg] args = tuple(self.__stack[len(self.__stack) - i.arg:]) if isinstance(callable_, types.CodeType): _c = callable_ e = Machine(self.verbose) l = dict((name, arg) for name, arg in zip( _c.co_varnames[:_c.co_argcount], args)) ret = e.exec(_c, self.__globals, l) elif isinstance(callable_, FunctionType): _c = callable_.get_code() e = Machine(self.verbose) e._locals = dict((name, arg) for name, arg in zip( _c.co_varnames[:_c.co_argcount], args)) l = dict((name, arg) for name, arg in zip( _c.co_varnames[:_c.co_argcount], args)) ret = e.exec(_c, self.__globals, l) elif (callable_ is builtins.__build_class__) and isinstance(args[0], FunctionType): ret = self.build_class(callable_, args) else: self.signal['CALL_FUNCTION'].emmit(callable_, *args) ret = callable_(*args) self.pop(1 + i.arg) self.__stack.append(ret)
def exec_instructions(self, c): if self.verbose: print('------------- begin exec') inst = dis.Bytecode(c) return_value_set = False for i in inst: if return_value_set: raise RuntimeError('RETURN_VALUE is not last opcode') if i.opcode == 1: self.__stack.pop() elif i.opcode == 10: # UNARY_POSITIVE TOS = self.__stack.pop() self.__stack.append(+TOS) elif i.opcode == 11: # UNARY_NEGATIVE TOS = self.__stack.pop() self.__stack.append(-TOS) elif i.opcode == 19: # BINARY_POWER TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 ** TOS) elif i.opcode == 22: # BINARY_MODULO TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 % TOS) elif i.opcode == 23: # BINARY_ADD TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 + TOS) elif i.opcode == 24: # BINARY_SUBTRACT TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 - TOS) elif i.opcode == 25: # BINARY_SUBSCR TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1[TOS]) elif i.opcode == 26: # BINARY_FLOOR_DIVIDE TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 // TOS) elif i.opcode == 27: # BINARY_TRUE_DIVIDE TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 / TOS) elif i.opcode == 71: # LOAD_BUILD_CLASS self.__stack.append(builtins.__build_class__) elif i.opcode == 83: # RETURN_VALUE return_value = self.__stack.pop() return_value_set = True elif i.opcode == 90: # STORE_NAME name = c.co_names[i.arg] TOS = self.__stack.pop() self.store_name(name, TOS) elif i.opcode == 100: # LOAD_CONST self.__stack.append(c.co_consts[i.arg]) elif i.opcode == 101: # LOAD_NAME name = c.co_names[i.arg] self.__stack.append(self.load_name(name)) elif i.opcode == 102: # BUILD_TUPLE self.__stack.append(tuple(self.pop(i.arg))) elif i.opcode == 106: # LOAD_ATTR name = c.co_names[i.arg] o = self.__stack.pop() self.signal['LOAD_ATTR'].emmit(o, name) self.__stack.append(getattr(o, name)) elif i.opcode == 107: # COMPARE_OP TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(Machine.cmp_op(i.arg)(TOS1, TOS)) elif i.opcode == 108: # IMPORT_NAME TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.signal['IMPORT_NAME'].emmit(c.co_names[i.arg], TOS, TOS1) self.__stack.append(__import__(c.co_names[i.arg], fromlist=TOS, level=TOS1)) elif i.opcode == 116: # LOAD_GLOBAL name = c.co_names[i.arg] self.__stack.append(self.load_name(name)) elif i.opcode == 124: # LOAD_FAST: name = c.co_varnames[i.arg] self.__stack.append(self._locals[name]) elif i.opcode == 125: # STORE_FAST TOS = self.__stack.pop() name = c.co_varnames[i.arg] self._locals[name] = TOS elif i.opcode == 131: # CALL_FUNCTION self.call_function(i) elif i.opcode == 132: # MAKE_FUNCTION if i.arg != 0: raise RuntimeError('not yet supported') n = dis.stack_effect(i.opcode, i.arg) args = self.pop(-n) code = self.__stack.pop() f = FunctionType(Machine(self.verbose), code, self.__globals, args[0]) self.__stack.append(f) elif i.opcode == 133: # BUILD_SLICE TOS = self.__stack.pop() TOS1 = self.__stack.pop() if i.arg == 2: self.__stack.append(slice(TOS1, TOS)) else: TOS2 = self.__stack.pop() self.__stack.append(slice(TOS2, TOS1, TOS)) else: raise RuntimeError('unhandled opcode',i.opcode,i.opname,i.arg,self.__stack) if self.verbose: print('{:20} {}'.format(i.opname, [(repr(s) if not str(hex(id(s))) in repr(s) else s.__class__) for s in self.__stack ])) if self.verbose: print('------------- return') return return_value
class MachineClassSource(Machine): def store_name(self, name, val): Machine.store_name(self, name, val) logger.debug('{} {} {} {}'.format(self.__class__.__name__, 'store_name', name, val)) def inst_to_bytes(inst): if inst.opname in ( 'LOAD_CONST', 'LOAD_NAME', 'STORE_NAME', ): return bytes([ inst.opcode, inst.arg, 0]) elif inst.opname in ( 'BINARY_ADD', 'RETURN_VALUE', ): return bytes([inst.opcode]) else: raise RuntimeError('unsupported op {}'.format(inst.opname)) class Assembler(object): def __init__(self): self.offset = 0 self.insts = list() self.consts = list() self.names = list() self.varnames = list() def get_const_arg(self, v): if not v in self.consts: self.consts.append(v) return self.consts.index(v) def get_name_arg(self, v): if not v in self.names: self.names.append(v) return self.names.index(v) def load_const(self, argval): inst = dis.Instruction( 'LOAD_CONST', dis.opname.index('LOAD_CONST'), self.get_const_arg(argval), argval, repr(argval), self.offset, None, False) self.offset += 3 self.insts.append(inst) def load_name(self, argval): inst = dis.Instruction( 'LOAD_NAME', dis.opname.index('LOAD_NAME'), self.get_name_arg(argval), argval, repr(argval), self.offset, None, False) self.offset += 3 self.insts.append(inst) def store_name(self, argval): inst = dis.Instruction( 'STORE_NAME', dis.opname.index('STORE_NAME'), self.get_name_arg(argval), argval, repr(argval), self.offset, None, False) self.offset += 3 self.insts.append(inst) def binary_add(self): inst = dis.Instruction( 'BINARY_ADD', dis.opname.index('BINARY_ADD'), None, None, '', self.offset, None, False) self.offset += 1 self.insts.append(inst) def return_value(self): inst = dis.Instruction( 'RETURN_VALUE', dis.opname.index('RETURN_VALUE'), None, None, '', self.offset, None, False) self.offset += 1 self.insts.append(inst) def code(self): b = b''.join(inst_to_bytes(i) for i in self.insts) logger.debug('bytes={}'.format(b)) c = types.CodeType( 0, 0, 0, 2, 64, b, tuple(self.consts), tuple(self.names), tuple(self.varnames), '<constructed code>', '<module>', 0, b'') return c