Source code for codemach.machine

import io
import sys
import dis
import types
import operator
import builtins
import pprint

import async_patterns

from .assembler import *

__all__ = ['Machine']

def function_wrapper(m, f):
    """
    Wrap a function so that when called, its code is executed by **machine**.

    :param machine: a :py:class:`Machine` object
    :param f: a python function object
    """
    def wrapped(*args):
        c = f.__code__

        l = {}

        varnames = list(c.co_varnames)
        
        m._print('wrapped')
        m._print(f)
        m._print(args)
        #self._print('callable = {}'.format(callable_))
        #self._print('closure  = {}'.format(callable_.function.__closure__))
        #self._print('args     = {}'.format(args))
        #self._print('varnames = {}'.format(varnames))

        args = list(args)
        for _ in range(c.co_argcount):
            l[varnames.pop(0)] = args.pop(0)
        
        if varnames:
            l[varnames.pop(0)] = tuple(args)

        return m.exec(f.__code__, f.__globals__, l)
    
    return wrapped

class FunctionType(object):
    def __init__(self, machine, code, globals_, name):
        self._machine = machine
        self.func_raw = types.FunctionType(code, globals_, name)
        self.function = function_wrapper(
                machine,
                self.func_raw)
        self.wrapped = self.function

    def get_code(self):
        """
        return the code object to be used by Machine
        """
        return self.function.__closure__[0].cell_contents.__code__
    
    def __repr__(self):
        return '<{} object, function={}>'.format(self.__class__.__module__+'.'+self.__class__.__name__, self.func_raw.__name__)

    def __call__(self, args):
        return self.wrapped(args)

class InstructionIterator:
    def __init__(self, inst):
        self._inst = list(inst)

        self._tab = dict((i.offset, (i, a)) for i, a in zip(self._inst, range(len(self._inst))))
    
    def __iter__(self):
        self._iter = iter(self._inst)
        return self

    def __next__(self):
        return next(self._iter)

    def jump(self, offset):
        i, a = self._tab[offset]

        self._iter = iter(self._inst)

        for _ in range(a):
            next(self._iter)

[docs]class Machine(object): """ Class that executes python code objects. :param verbose: verbosity level """ def __init__(self, verbose=False, callbacks={}): self.__stack = [] self.__blocks = [] self.verbose = verbose self.__callbacks = callbacks if not verbose: self._output = io.StringIO() def add_callback(self, opname, callable_): if opname not in self.__callbacks: self.__callbacks[opname] = async_patterns.Callbacks() self.__callbacks[opname].add_callback(callable_) def call_callbacks(self, opname, *args, **kwargs): if opname not in self.__callbacks: return self.__callbacks[opname](*args, **kwargs) def _print(self, *args): if self.verbose: print(*args) else: print(*args, file=self._output) @staticmethod def cmp_op(i): def not_in(a, b): return not (a in b) return ( operator.lt, operator.le, operator.eq, operator.ne, operator.gt, operator.ge, operator.contains, not_in, operator.is_, operator.is_not, 'exception match', 'BAD')[i]
[docs] def exec(self, code, globals_=None, _locals=None): """ Execute a code object The inputs and behavior of this function should match those of eval_ and exec_. .. _eval: https://docs.python.org/3/library/functions.html?highlight=eval#eval .. _exec: https://docs.python.org/3/library/functions.html?highlight=exec#exec .. note:: Need to figure out how the internals of this function must change for ``eval`` or ``exec``. :param code: a python code object :param globals_: optional globals dictionary :param _locals: optional locals dictionary """ if globals_ is None: globals_ = globals() if _locals is None: self._locals = globals_ else: self._locals = _locals self.__globals = globals_ return self.exec_instructions(code)
[docs] def load_name(self, name): """ Implementation of the LOAD_NAME operation """ if name in self.__globals: return self.__globals[name] b = self.__globals['__builtins__'] if isinstance(b, dict): return b[name] else: return getattr(b, name)
[docs] def store_name(self, name, val): """ Implementation of the STORE_NAME operation """ self._locals[name] = val
#self.__globals[name] = val
[docs] def pop(self, n): """ Pop the **n** topmost items from the stack and return them as a ``list``. """ poped = self.__stack[len(self.__stack) - n:] del self.__stack[len(self.__stack) - n:] return poped
[docs] def build_class(self, callable_, args): """ Implement ``builtins.__build_class__``. We must wrap all class member functions using :py:func:`function_wrapper`. This requires using a :py:class:`Machine` to execute the class source code and then recreating the class source code using an :py:class:`Assembler`. .. note: We might be able to bypass the call to ``builtins.__build_class__`` entirely and manually construct a class object. """ self._print('build_class') self._print(callable_) self._print('args=',args) if isinstance(args[0], FunctionType): c = args[0].get_code() else: c = args[0].__closure__[0].cell_contents.__code__ machine = Machine(self.verbose) l = dict() machine.exec(c, self.__globals, l) # construct code for class source a = Assembler() for name, value in l.items(): a.load_const(value) a.store_name(name) a.load_const(None) a.return_value() machine = Machine(self.verbose) f = types.FunctionType(a.code(), self.__globals, args[1]) args = (f, *args[1:]) self.call_callbacks('CALL_FUNCTION', callable_, *args) return callable_(*args)
[docs] def call_function(self, i): """ Implement the CALL_FUNCTION_ operation. .. _CALL_FUNCTION: https://docs.python.org/3/library/dis.html#opcode-CALL_FUNCTION """ callable_ = self.__stack[-1-i.arg] args = tuple(self.__stack[len(self.__stack) - i.arg:]) self.call_callbacks('CALL_FUNCTION', callable_, *args) if isinstance(callable_, types.CodeType): _c = callable_ e = Machine(self.verbose) l = dict((name, arg) for name, arg in zip( _c.co_varnames[:_c.co_argcount], args)) ret = e.exec(c, self.__globals, l) elif isinstance(callable_, FunctionType): if False: c = callable_.get_code() m = callable_._machine # construct locals l = {} varnames = list(c.co_varnames) self._print('callable = {}'.format(callable_)) self._print('closure = {}'.format(callable_.function.__closure__)) self._print('args = {}'.format(args)) self._print('varnames = {}'.format(varnames)) args = list(args) for _ in range(c.co_argcount): l[varnames.pop(0)] = args.pop(0) if varnames: l[varnames.pop(0)] = tuple(args) ret = m.exec(c, self.__globals, l) else: ret = callable_(args) elif (callable_ is builtins.__build_class__) and isinstance(args[0], FunctionType): ret = self.build_class(callable_, args) elif callable_ is builtins.__build_class__: ret = self.build_class(callable_, args) else: ret = callable_(*args) self.pop(1 + i.arg) self.__stack.append(ret)
def __build_list(self, i): self.__stack.append(list(self.pop(i.arg))) def __inst_setup_loop(self, i): self.__blocks.append(i.arg) def __inst_get_iter(self, i): self.__stack.append(iter(self.__stack.pop())) def __inst_for_iter(self, i): TOS = self.__stack[-1] try: self.__stack.append(TOS.__next__()) except StopIteration: self.__stack.pop() self._ii.jump(i.arg + i.offset + 2) def __inst_jump_absolute(self, i): self._ii.jump(i.arg) def __inst_pop_block(self, i): self.__blocks.pop() def __inst_pop_jump_if_true(self, i): if self.__stack.pop(): self._ii.jump(i.arg) def __inst_unpack_sequence(self, i): TOS = self.__stack.pop() for el in reversed(TOS): self.__stack.append(el) def __inst_raise_varargs(self, i): cls = self.__stack.pop() args = self.pop(i.arg-1) raise cls(*args) def exec_instructions(self, c): self._print('------------- begin exec') inst = dis.Bytecode(c) return_value_set = False self._ii = InstructionIterator(inst) if self.verbose: pprint.pprint(self._ii._tab) ops = { 'BUILD_LIST': self.__build_list, 'CALL_FUNCTION': self.call_function, 'SETUP_LOOP': self.__inst_setup_loop, 'GET_ITER': self.__inst_get_iter, 'FOR_ITER': self.__inst_for_iter, 'JUMP_ABSOLUTE': self.__inst_jump_absolute, 'POP_BLOCK': self.__inst_pop_block, 'POP_JUMP_IF_TRUE': self.__inst_pop_jump_if_true, 'UNPACK_SEQUENCE': self.__inst_unpack_sequence, 'RAISE_VARARGS': self.__inst_raise_varargs, } #for i in inst: for i in self._ii: if return_value_set: raise RuntimeError('RETURN_VALUE is not last opcode') if i.opname in ops: try: ops[i.opname](i) except Exception as e: print('during machine exec {}: {}'.format(i.opname, e)) if not self.verbose: print('printing output') print(self._output.getvalue()) raise elif i.opcode == 1: self.__stack.pop() elif i.opcode == 10: # UNARY_POSITIVE TOS = self.__stack.pop() self.__stack.append(+TOS) elif i.opcode == 11: # UNARY_NEGATIVE TOS = self.__stack.pop() self.__stack.append(-TOS) elif i.opcode == 19: # BINARY_POWER TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 ** TOS) elif i.opcode == 22: # BINARY_MODULO TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 % TOS) elif i.opcode == 23: # BINARY_ADD TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 + TOS) elif i.opcode == 24: # BINARY_SUBTRACT TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 - TOS) elif i.opcode == 25: # BINARY_SUBSCR TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1[TOS]) elif i.opcode == 26: # BINARY_FLOOR_DIVIDE TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 // TOS) elif i.opcode == 27: # BINARY_TRUE_DIVIDE TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(TOS1 / TOS) elif i.opcode == 71: # LOAD_BUILD_CLASS self.__stack.append(builtins.__build_class__) elif i.opcode == 83: # RETURN_VALUE return_value = self.__stack.pop() return_value_set = True elif i.opcode == 90: # STORE_NAME name = c.co_names[i.arg] TOS = self.__stack.pop() self.store_name(name, TOS) elif i.opcode == 100: # LOAD_CONST self.__stack.append(c.co_consts[i.arg]) elif i.opcode == 101: # LOAD_NAME name = c.co_names[i.arg] self.__stack.append(self.load_name(name)) elif i.opcode == 102: # BUILD_TUPLE self.__stack.append(tuple(self.pop(i.arg))) elif i.opcode == 106: # LOAD_ATTR name = c.co_names[i.arg] o = self.__stack.pop() self.call_callbacks('LOAD_ATTR', o, name) self.__stack.append(getattr(o, name)) elif i.opcode == 107: # COMPARE_OP TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.__stack.append(Machine.cmp_op(i.arg)(TOS1, TOS)) elif i.opcode == 108: # IMPORT_NAME TOS = self.__stack.pop() TOS1 = self.__stack.pop() self.call_callbacks('IMPORT_NAME', c.co_names[i.arg], TOS, TOS1) self.__stack.append(__import__(c.co_names[i.arg], fromlist=TOS, level=TOS1)) elif i.opcode == 116: # LOAD_GLOBAL name = c.co_names[i.arg] self.__stack.append(self.load_name(name)) elif i.opcode == 124: # LOAD_FAST: name = c.co_varnames[i.arg] self.__stack.append(self._locals[name]) elif i.opcode == 125: # STORE_FAST TOS = self.__stack.pop() name = c.co_varnames[i.arg] self._locals[name] = TOS elif i.opcode == 132: # MAKE_FUNCTION if i.arg != 0: raise RuntimeError('not yet supported') n = dis.stack_effect(i.opcode, i.arg) args = self.pop(-n) code = self.__stack.pop() f = FunctionType(Machine(self.verbose), code, self.__globals, args[0]) # experimenting f = f.wrapped self.__stack.append(f) elif i.opcode == 133: # BUILD_SLICE TOS = self.__stack.pop() TOS1 = self.__stack.pop() if i.arg == 2: self.__stack.append(slice(TOS1, TOS)) else: TOS2 = self.__stack.pop() self.__stack.append(slice(TOS2, TOS1, TOS)) else: raise RuntimeError('unhandled opcode',i.opcode,i.opname,i.arg,self.__stack) self._print('{:20} {}'.format(i.opname, [(repr(s) if not str(hex(id(s))) in repr(s) else s.__class__) for s in self.__stack ])) self._print('------------- return') return return_value