Source code for codetransformer.core

from collections import OrderedDict
from contextlib import contextmanager
from ctypes import py_object, pythonapi
from itertools import chain
from types import CodeType, FunctionType
from weakref import WeakKeyDictionary

try:
    import threading
except ImportError:
    import dummy_threading as threading

from .code import Code
from .instructions import LOAD_CONST, STORE_FAST, LOAD_FAST
from .patterns import (
    boundpattern,
    patterndispatcher,
    DEFAULT_STARTCODE,
)
from .utils.instance import instance


_cell_new = pythonapi.PyCell_New
_cell_new.argtypes = (py_object,)
_cell_new.restype = py_object


def _a_if_not_none(a, b):
    return a if a is not None else b


def _new_lnotab(instrs, lnotab):
    """The updated lnotab after the instructions have been transformed.

    Parameters
    ----------
    instrs : iterable[Instruction]
        The new instructions.
    lnotab : dict[Instruction -> int]
        The lnotab for the old code object.

    Returns
    -------
    new_lnotab : dict[Instruction -> int]
        The post transform lnotab.
    """
    return {
        lno: _a_if_not_none(instr._stolen_by, instr)
        for lno, instr in lnotab.items()
    }


class NoContext(Exception):
    """Exception raised to indicate that the ``code` or ``startcode``
    attribute was accessed outside of a code context.
    """
    def __init__(self):
        return super().__init__('no active transformation context')


class Context:
    """Empty object for holding the transformation context.
    """
    def __init__(self, code):
        self.code = code
        self.startcode = DEFAULT_STARTCODE

    def __repr__(self):  # pragma: no cover
        return '<%s: %r>' % (type(self).__name__, self.__dict__)


class CodeTransformerMeta(type):
    """Meta class for CodeTransformer to collect all of the patterns
    and ensure the class dict is ordered.

    Patterns are created when a method is decorated with
    ``codetransformer.pattern.pattern``
    """
    def __new__(mcls, name, bases, dict_):
        dict_['patterndispatcher'] = patterndispatcher(*chain(
            (v for v in dict_.values() if isinstance(v, boundpattern)),
            *(
                d and d.patterns for d in (
                    getattr(b, 'patterndispatcher', ()) for b in bases
                )
            )
        ))
        return super().__new__(mcls, name, bases, dict_)

    def __prepare__(self, bases):
        return OrderedDict()


[docs]class CodeTransformer(metaclass=CodeTransformerMeta): """A code object transformer, similar to the NodeTransformer from the ast module. Attributes ---------- code """ __slots__ = '__weakref__',
[docs] def transform_consts(self, consts): """transformer for the co_consts field. Override this method to transform the `co_consts` of the code object. Parameters ---------- consts : tuple The co_consts Returns ------- new_consts : tuple The new constants. """ return tuple( self.transform(Code.from_pycode(const)).to_pycode() if isinstance(const, CodeType) else const for const in consts )
def _id(self, obj): """Identity function. Parameters ---------- obj : any The object to return Returns ------- obj : any The input unchanged """ return obj transform_name = _id transform_names = _id transform_varnames = _id transform_freevars = _id transform_cellvars = _id transform_defaults = _id del _id
[docs] def transform(self, code, *, name=None, filename=None): """Transform a codetransformer.Code object applying the transforms. Parameters ---------- code : Code The code object to transform. name : str, optional The new name for this code object. filename : str, optional The new filename for this code object. Returns ------- new_code : Code The transformed code object. """ # reverse lookups from for constants and names. reversed_consts = {} reversed_names = {} reversed_varnames = {} for instr in code: if isinstance(instr, LOAD_CONST): reversed_consts[instr] = instr.arg if instr.uses_name: reversed_names[instr] = instr.arg if isinstance(instr, (STORE_FAST, LOAD_FAST)): reversed_varnames[instr] = instr.arg instrs, consts = tuple(zip(*reversed_consts.items())) or ((), ()) for instr, const in zip(instrs, self.transform_consts(consts)): instr.arg = const instrs, names = tuple(zip(*reversed_names.items())) or ((), ()) for instr, name_ in zip(instrs, self.transform_names(names)): instr.arg = name_ instrs, varnames = tuple(zip(*reversed_varnames.items())) or ((), ()) for instr, varname in zip(instrs, self.transform_varnames(varnames)): instr.arg = varname with self._new_context(code): post_transform = self.patterndispatcher(code) return Code( post_transform, code.argnames, cellvars=self.transform_cellvars(code.cellvars), freevars=self.transform_freevars(code.freevars), name=name if name is not None else code.name, filename=filename if filename is not None else code.filename, firstlineno=code.firstlineno, lnotab=_new_lnotab(post_transform, code.lnotab), flags=code.flags, )
def __call__(self, f, *, globals_=None, name=None, defaults=None, closure=None): # Callable so that we can use CodeTransformers as decorators. if closure is not None: closure = tuple(map(_cell_new, closure)) else: closure = f.__closure__ return FunctionType( self.transform(Code.from_pycode(f.__code__)).to_pycode(), _a_if_not_none(globals_, f.__globals__), _a_if_not_none(name, f.__name__), _a_if_not_none(defaults, f.__defaults__), closure, ) @instance class _context_stack(threading.local): """Thread safe transformation context stack. Each thread will get it's own ``WeakKeyDictionary`` that maps instances to a stack of ``Context`` objects. When this descriptor is looked up we first try to get the weakkeydict off of the thread local storage. If it doesn't exist we make a new map. Then we lookup our instance in this map. If it doesn't exist yet create a new stack (as an empty list). This allows a single instance of ``CodeTransformer`` to be used recursively to transform code objects in a thread safe way while still being able to use a stateful context. """ def __get__(self, instance, owner): try: stacks = self._context_stacks except AttributeError: stacks = self._context_stacks = WeakKeyDictionary() if instance is None: # when looked up off the class return the current threads # context stacks map return stacks return stacks.setdefault(instance, []) @contextmanager def _new_context(self, code): self._context_stack.append(Context(code)) try: yield finally: self._context_stack.pop() @property def context(self): """Lookup the current transformation context. Raises ------ NoContext Raised when there is no active transformation context. """ try: return self._context_stack[-1] except IndexError: raise NoContext() @property def code(self): """The code object we are currently manipulating. """ return self.context.code @property def startcode(self): """The startcode we are currently in. """ return self.context.startcode
[docs] def begin(self, startcode): """Begin a new startcode. Parameters ---------- startcode : any The startcode to begin. """ self.context.startcode = startcode