Source code for codetransformer.code

from collections import OrderedDict
from dis import Bytecode, dis, findlinestarts
from enum import IntEnum, unique
from functools import reduce
from itertools import repeat
import operator as op
import sys
from types import CodeType

from .instructions import (
    Instruction,
    LOAD_CONST,
    YIELD_FROM,
    YIELD_VALUE,
    _RawArg,
)
from .utils.functional import scanl, reverse_dict, ffill
from .utils.immutable import lazyval
from .utils.instance import instance


WORDCODE = sys.version_info >= (3, 6)
if WORDCODE:
    argsize = 1
    max_lnotab_increment = 127

    def _sparse_args(instrs):
        for instr in instrs:
            yield instr
            yield None

else:
    argsize = 2
    max_lnotab_increment = 255

    def _sparse_args(instrs):
        for instr in instrs:
            yield instr
            if instr.have_arg:
                yield None
                yield None


_sparse_args.__doc__ = """\
Makes the arguments sparse so that instructions live at the correct index for
the jump resolution step.

This pads the instruction set with None to mark the bytes occupied by
arguments.

Parameters
----------
instrs : iterable of Instruction
    The dense instruction set.

Yields
------
sparse : Instruction or None
    Yields the instructions, with objects marking the bytes that are used for
    arguments.
"""


@unique
[docs]class Flag(IntEnum): """ An enum describing the bitmask of flags that can be set on a code object. """ # These enum values and comments are taken from CPython. CO_OPTIMIZED = 0x0001 CO_NEWLOCALS = 0x0002 CO_VARARGS = 0x0004 CO_VARKEYWORDS = 0x0008 CO_NESTED = 0x0010 CO_GENERATOR = 0x0020 # The CO_NOFREE flag is set if there are no free or cell variables. # This information is redundant, but it allows a single flag test # to determine whether there is any extra work to be done when the # call frame it setup. CO_NOFREE = 0x0040 # The CO_COROUTINE flag is set for coroutines creates with the # types.coroutine decorator. This converts old-style coroutines into # python3.5 style coroutines. CO_COROUTINE = 0x0080 CO_ITERABLE_COROUTINE = 0x0100 # Old values: CO_FUTURE_DIVISION = 0x2000 CO_FUTURE_ABSOLUTE_IMPORT = 0x4000 # Do absolute imports by default. CO_FUTURE_WITH_STATEMENT = 0x8000 CO_FUTURE_PRINT_FUNCTION = 0x10000 CO_FUTURE_UNICODE_LITERALS = 0x20000 CO_FUTURE_BARRY_AS_BDFL = 0x40000 CO_FUTURE_GENERATOR_STOP = 0x80000 @instance class max: """The largest bitmask that represents a valid flag. """ def __get__(self, instance, owner): return owner.pack(**dict(zip(owner.__members__, repeat(True)))) def __set__(self, instance, value): raise AttributeError("can't set 'max' attribute") @classmethod
[docs] def pack(cls, *, CO_OPTIMIZED, CO_NEWLOCALS, CO_VARARGS, CO_VARKEYWORDS, CO_NESTED, CO_GENERATOR, CO_NOFREE, CO_COROUTINE, CO_ITERABLE_COROUTINE, CO_FUTURE_DIVISION, CO_FUTURE_ABSOLUTE_IMPORT, CO_FUTURE_WITH_STATEMENT, CO_FUTURE_PRINT_FUNCTION, CO_FUTURE_UNICODE_LITERALS, CO_FUTURE_BARRY_AS_BDFL, CO_FUTURE_GENERATOR_STOP): """Pack a flags into a bitmask. I hope you like kwonly args. Parameters ---------- CO_OPTIMIZED : bool CO_NEWLOCALS : bool CO_VARARGS : bool CO_VARKEYWORDS : bool CO_NESTED : bool CO_GENERATOR : bool CO_NOFREE : bool CO_COROUTINE : bool CO_ITERABLE_COROUTINE : bool CO_FUTURE_DIVISION : bool CO_FUTURE_ABSOLUTE_IMPORT : bool CO_FUTURE_WITH_STATEMENT : bool CO_FUTURE_PRINT_FUNCTION : bool CO_FUTURE_UNICODE_LITERALS : bool CO_FUTURE_BARRY_AS_BDFL : bool CO_FUTURE_GENERATOR_STOP : bool Returns ------- mask : int See Also -------- codetransformer.code.Flag.unpack """ ls = locals() return reduce( op.or_, (v for k, v in cls.__members__.items() if ls[k]), 0, )
@classmethod
[docs] def unpack(cls, mask): """Unpack a bitmask into a map of flag to bool. Parameters ---------- mask : int A bitmask Returns ------- mapping : OrderedDict[str -> bool] The mapping of flag name to flag status. See Also -------- codetransformer.code.Flag.pack """ if mask > cls.max: raise ValueError('Invalid mask, too large: %d' % mask) return OrderedDict( (k, bool(mask & getattr(cls, k))) for k, v in cls.__members__.items() )
def _freevar_argname(arg, cellvars, freevars): """ Get the name of the variable manipulated by a 'uses_free' instruction. Parameters ---------- arg : int The raw argument to a uses_free instruction that we want to resolve to a name. cellvars : list[str] The co_cellvars of the function for which we want to resolve `arg`. freevars : list[str] The co_freevars of the function for which we want to resolve `arg`. Notes ----- From https://docs.python.org/3.5/library/dis.html#opcode-LOAD_CLOSURE: The name of the variable is co_cellvars[i] if i is less than the length of co_cellvars. Otherwise it is co_freevars[i - len(co_cellvars)] """ len_cellvars = len(cellvars) if arg < len_cellvars: return cellvars[arg] return freevars[arg - len_cellvars] def pycode(argcount, kwonlyargcount, nlocals, stacksize, flags, codestring, constants, names, varnames, filename, name, firstlineno, lnotab, freevars=(), cellvars=()): """types.CodeType constructor that accepts keyword arguments. See Also -------- types.CodeType """ return CodeType( argcount, kwonlyargcount, nlocals, stacksize, flags, codestring, constants, names, varnames, filename, name, firstlineno, lnotab, freevars, cellvars, )
[docs]class Code: """A higher abstraction over python's CodeType. See Include/code.h for more information. Parameters ---------- instrs : iterable of Instruction A sequence of codetransformer Instruction objects. argnames : iterable of str, optional The names of the arguments to the code object. name : str, optional The name of this code object. filename : str, optional The file that this code object came from. firstlineno : int, optional The first line number of the code in this code object. lnotab : dict[Instruction -> int], optional The mapping from instruction to the line that it starts. flags : dict[str -> bool], optional Any flags to set. This updates the default flag set. Attributes ---------- argcount argnames cellvars constructs_new_locals consts filename flags freevars instrs is_coroutine is_generator is_iterable_coroutine is_nested kwonlyargcount lnotab name names py_lnotab sparse_instrs stacksize varnames """ __slots__ = ( '_instrs', '_argnames', '_argcount', '_kwonlyargcount', '_cellvars', '_freevars', '_name', '_filename', '_firstlineno', '_lnotab', '_flags', '__weakref__', ) def __init__(self, instrs, argnames=(), *, cellvars=(), freevars=(), name='<code>', filename='<code>', firstlineno=1, lnotab=None, flags=None): instrs = tuple(instrs) # strictly evaluate any generators. # The starting varnames (the names of the arguments to the function) argcount = [0] kwonlyargcount = [0] argcounter = argcount # Which set of args are we currently counting. _argnames = [] append_argname = _argnames.append varg = kwarg = None for argname in argnames: if argname.startswith('**'): if kwarg is not None: raise ValueError('cannot specify **kwargs more than once') kwarg = argname[2:] continue elif argname.startswith('*'): if varg is not None: raise ValueError('cannot specify *args more than once') varg = argname[1:] argcounter = kwonlyargcount # all following args are kwonly. continue argcounter[0] += 1 append_argname(argname) if varg is not None: append_argname(varg) if kwarg is not None: append_argname(kwarg) cellvar_names = set(cellvars) freevar_names = set(freevars) for instr in filter(op.attrgetter('uses_free'), instrs): if instr.arg in cellvar_names: instr._vartype = 'cell' elif instr.arg in freevar_names: instr._vartype = 'free' else: raise ValueError( "Argument to %r is not in cellvars or freevars." % instr ) for instr in filter(op.attrgetter('is_jmp'), instrs): instr.arg._target_of.add(instr) self._instrs = instrs self._argnames = tuple(_argnames) self._argcount = argcount[0] self._kwonlyargcount = kwonlyargcount[0] self._cellvars = cellvars self._freevars = freevars self._name = name self._filename = filename self._firstlineno = firstlineno self._lnotab = lnotab or {} self._flags = Flag.pack(**dict( dict( CO_OPTIMIZED=True, CO_NEWLOCALS=True, CO_VARARGS=varg is not None, CO_VARKEYWORDS=kwarg is not None, CO_NESTED=False, CO_GENERATOR=any( isinstance(instr, (YIELD_VALUE, YIELD_FROM)) for instr in instrs ), CO_NOFREE=not any(map(op.attrgetter('uses_free'), instrs)), CO_COROUTINE=False, CO_ITERABLE_COROUTINE=False, CO_FUTURE_DIVISION=False, CO_FUTURE_ABSOLUTE_IMPORT=False, CO_FUTURE_WITH_STATEMENT=False, CO_FUTURE_PRINT_FUNCTION=False, CO_FUTURE_UNICODE_LITERALS=False, CO_FUTURE_BARRY_AS_BDFL=False, CO_FUTURE_GENERATOR_STOP=False, ), **flags or {} )) @classmethod
[docs] def from_pyfunc(cls, f): """Create a Code object from a python function object. Parameters ---------- f : function The function from which to construct a code object. Returns ------- code : Code A Code object representing f.__code__. """ return cls.from_pycode(f.__code__)
@classmethod
[docs] def from_pycode(cls, co): """Create a Code object from a python code object. Parameters ---------- co : CodeType The python code object. Returns ------- code : Code The codetransformer Code object. """ # Make it sparse to instrs[n] is the instruction at bytecode[n] sparse_instrs = tuple( _sparse_args( Instruction.from_opcode( b.opcode, Instruction._no_arg if b.arg is None else _RawArg(b.arg), ) for b in Bytecode(co) ), ) for idx, instr in enumerate(sparse_instrs): if instr is None: # The sparse value continue if instr.absjmp: instr.arg = sparse_instrs[instr.arg] elif instr.reljmp: instr.arg = sparse_instrs[instr.arg + idx + argsize + 1] elif isinstance(instr, LOAD_CONST): instr.arg = co.co_consts[instr.arg] elif instr.uses_name: instr.arg = co.co_names[instr.arg] elif instr.uses_varname: instr.arg = co.co_varnames[instr.arg] elif instr.uses_free: instr.arg = _freevar_argname( instr.arg, co.co_freevars, co.co_cellvars, ) elif instr.have_arg and isinstance(instr.arg, _RawArg): instr.arg = int(instr.arg) flags = Flag.unpack(co.co_flags) has_vargs = flags['CO_VARARGS'] has_kwargs = flags['CO_VARKEYWORDS'] # Here we convert the varnames format into our argnames format. paramnames = co.co_varnames[ :(co.co_argcount + co.co_kwonlyargcount + has_vargs + has_kwargs) ] # We start with the positional arguments. new_paramnames = list(paramnames[:co.co_argcount]) # Add *args next. if has_vargs: new_paramnames.append('*' + paramnames[-1 - has_kwargs]) # Add positional only arguments next. new_paramnames.extend(paramnames[ co.co_argcount:co.co_argcount + co.co_kwonlyargcount ]) # Add **kwargs last. if has_kwargs: new_paramnames.append('**' + paramnames[-1]) return cls( filter(bool, sparse_instrs), argnames=new_paramnames, cellvars=co.co_cellvars, freevars=co.co_freevars, name=co.co_name, filename=co.co_filename, firstlineno=co.co_firstlineno, lnotab={ lno: sparse_instrs[off] for off, lno in findlinestarts(co) }, flags=flags, )
[docs] def to_pycode(self): """Create a python code object from the more abstract codetransfomer.Code object. Returns ------- co : CodeType The python code object. """ consts = self.consts names = self.names varnames = self.varnames freevars = self.freevars cellvars = self.cellvars bc = bytearray() for instr in self.instrs: bc.append(instr.opcode) # Write the opcode byte. if isinstance(instr, LOAD_CONST): # Resolve the constant index. bc.extend(consts.index(instr.arg).to_bytes(argsize, 'little')) elif instr.uses_name: # Resolve the name index. bc.extend(names.index(instr.arg).to_bytes(argsize, 'little')) elif instr.uses_varname: # Resolve the local variable index. bc.extend( varnames.index(instr.arg).to_bytes(argsize, 'little'), ) elif instr.uses_free: # uses_free is really "uses freevars **or** cellvars". try: # look for the name in cellvars bc.extend( cellvars.index(instr.arg).to_bytes(argsize, 'little'), ) except ValueError: # fall back to freevars, incrementing the length of # cellvars. bc.extend( (freevars.index(instr.arg) + len(cellvars)).to_bytes( argsize, 'little', ) ) elif instr.absjmp: # Resolve the absolute jump target. bc.extend( self.bytecode_offset(instr.arg).to_bytes( argsize, 'little', ), ) elif instr.reljmp: # Resolve the relative jump target. # We do this by subtracting the curren't instructions's # sparse index from the sparse index of the argument. # We then subtract argsize - 1 to account for the bytes the # current instruction takes up. bytecode_offset = self.bytecode_offset bc.extend(( bytecode_offset(instr.arg) - bytecode_offset(instr) - argsize - 1 ).to_bytes(argsize, 'little',)) elif instr.have_arg: # Write any other arg here. bc.extend(instr.arg.to_bytes(argsize, 'little')) elif WORDCODE: # with wordcode, all instructions are padded to 2 bytes bc.append(0) return CodeType( self.argcount, self.kwonlyargcount, len(varnames), self.stacksize, self.py_flags, bytes(bc), consts, names, varnames, self.filename, self.name, self.firstlineno, self.py_lnotab, freevars, cellvars, )
@property def instrs(self): """The instructions in this code object. """ return self._instrs @property def sparse_instrs(self): """The instructions where the index of an instruction is the bytecode offset of that instruction. None indicates that no instruction is at that offset. """ return tuple(_sparse_args(self.instrs)) @property def argcount(self): """The number of arguments this code object accepts. This does not include varargs (\*args). """ return self._argcount @property def kwonlyargcount(self): """The number of keyword only arguments this code object accepts. This does not include varkwargs (\*\*kwargs). """ return self._kwonlyargcount @property def consts(self): """The constants referenced in this code object. """ # We cannot use a set comprehension because consts do not need # to be hashable. consts = [] append_const = consts.append for instr in self.instrs: if isinstance(instr, LOAD_CONST) and instr.arg not in consts: append_const(instr.arg) return tuple(consts) @property def names(self): """The names referenced in this code object. Names come from instructions like LOAD_GLOBAL or STORE_ATTR where the name of the global or attribute is needed at runtime. """ # We must sort to preserve the order between calls. # The set comprehension is to drop the duplicates. return tuple(sorted({ instr.arg for instr in self.instrs if instr.uses_name })) @property def argnames(self): """The names of the arguments to this code object. The format is: [args] [vararg] [kwonlyargs] [varkwarg] where each group is optional. """ return self._argnames @property def varnames(self): """The names of all of the local variables in this code object. """ # We must sort to preserve the order between calls. # The set comprehension is to drop the duplicates. return self._argnames + tuple(sorted({ instr.arg for instr in self.instrs if instr.uses_varname and instr.arg not in self._argnames })) @property def cellvars(self): """The names of the variables closed over by inner code objects. """ return self._cellvars @property def freevars(self): """The names of the variables this code object has closed over. """ return self._freevars @property def flags(self): """The flags of this code object represented as a mapping from flag name to boolean status. Notes ----- This is a copy of the underlying flags. Mutations will not affect the code object. """ return Flag.unpack(self._flags) @property def py_flags(self): """The flags of this code object represented as a bitmask. """ return self._flags @property def is_nested(self): """Is this a nested code object? """ return bool(self._flags & Flag.CO_NESTED) @property def is_generator(self): """Is this a generator? """ return bool(self._flags & Flag.CO_GENERATOR) @property def is_coroutine(self): """Is this a coroutine defined with async def? This is 3.5 and greater. """ return bool(self._flags & Flag.CO_COROUTINE) @property def is_iterable_coroutine(self): """Is this an async generator defined with types.coroutine? This is 3.5 and greater. """ return bool(self._flags & Flag.CO_ITERABLE_COROUTINE) @property def constructs_new_locals(self): """Does this code object construct new locals? This is True for things like functions where executing the code needs a new locals dict each time; however, something like a module does not normally need new locals. """ return bool(self._flags & Flag.CO_NEWLOCALS) @property def filename(self): """The filename of this code object. """ return self._filename @property def name(self): """The name of this code object. """ return self._name @property def firstlineno(self): """The first source line from self.filename that this code object represents. """ return self._firstlineno @property def lnotab(self): """The mapping of line number to the first instruction on that line. """ return self._lnotab @lazyval def lno_of_instr(self): instrs = self.instrs lnos = [None] * len(instrs) reverse_lnotab = reverse_dict(self.lnotab) for n, instr in enumerate(instrs): lnos[n] = reverse_lnotab.get(instr) return dict(zip(instrs, ffill(lnos))) @property def py_lnotab(self): """The encoded lnotab that python uses to compute when lines start. Note ---- See Objects/lnotab_notes.txt in the cpython source for more details. """ reverse_lnotab = reverse_dict(self.lnotab) py_lnotab = [] prev_instr = 0 prev_lno = self.firstlineno for addr, instr in enumerate(_sparse_args(self.instrs)): lno = reverse_lnotab.get(instr) if lno is None: continue delta = lno - prev_lno py_lnotab.append(addr - prev_instr) py_lnotab.append(min(delta, max_lnotab_increment)) delta -= max_lnotab_increment while delta > 0: py_lnotab.append(0) py_lnotab.append(min(delta, max_lnotab_increment)) delta -= max_lnotab_increment prev_lno = lno prev_instr = addr return bytes(py_lnotab) @property def stacksize(self): """The maximum amount of stack space used by this code object. """ return max(scanl( op.add, 0, map(op.attrgetter('stack_effect'), self.instrs), ))
[docs] def index(self, instr): """Returns the index of instr. Parameters ---------- instr : Instruction The instruction the check the index of. Returns ------- idx : int The index of instr in this code object. """ return self.instrs.index(instr)
[docs] def bytecode_offset(self, instr): """Returns the offset of instr in the bytecode representation. Parameters ---------- instr : Instruction The instruction the check the index of. Returns ------- idx : int The index of instr in this code object in the sparse instructions. """ return self.sparse_instrs.index(instr)
def __getitem__(self, key): return self.instrs[key] def __iter__(self): return iter(self.instrs) def __len__(self): return len(self.instrs) def __contains__(self, instr): return instr in self.instrs
[docs] def dis(self, file=None): """ Print self via the stdlib ``dis`` module. Parameters ---------- file : file-like, optional A file-like object into which we should print. Defaults to sys.stdout. """ dis(self.to_pycode(), file=file)