# Copyright (c) 2016, Andrew Leech <andrew@alelec.net>
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# The full license is also available in the file LICENSE.apache-2.0.txt
''' A collection of convenience classes and functions for CFFI wrappers. '''
__version__ = '0.4'
import collections
import six
import types
from functools import wraps
from collections import namedtuple
try:
import cffi
except ImportError:
cffi = None
try:
try:
import numpypy
except ImportError:
pass
import numpy
except ImportError:
numpy = None
if six.PY3:
long = int
__all__ = [
'CFunction',
'CStruct',
'CUnion',
'CStructType',
'CUnionType',
'CObject',
'NullError',
'cmethod',
'cstaticmethod',
'cproperty',
'wrap',
'wrapall',
'wrapenum',
'carray',
'nparrayptr',
]
if cffi:
_global_ffi = cffi.FFI()
else:
_global_ffi = None
_endian = None
_ntoh, _hton = None, None
def load_endian_translate():
if not cffi:
raise NotImplementedError("cffi module required")
import platform
_global_ffi.cdef("""
uint32_t htonl(uint32_t hostlong);
uint16_t htons(uint16_t hostshort);
uint32_t ntohl(uint32_t netlong);
uint16_t ntohs(uint16_t netshort);
""")
if platform.system() == 'Windows':
_global_ffi.cdef("""
uint64_t htonll(uint64_t hostlong);
uint64_t ntohll(uint64_t netlong);
uint32_t htonf(float hostfloat);
float ntohf(uint32_t netfloat);
uint64_t htond(double hostdouble);
double ntohd(uint64_t netdouble);
""")
global _endian
_endian = _global_ffi.dlopen("Ws2_32")
else:
raise NotImplementedError()
global _ntoh
global _hton
_ntoh = dict(
int16_t=_endian.ntohs,
uint16_t=_endian.ntohs,
int32_t=_endian.ntohl,
uint32_t=_endian.ntohl,
int64_t=lambda x: (_endian.ntohl(x & 0xFFFFFFFF) << 32) | _endian.ntohl(x >> 32),
uint64_t=lambda x: (_endian.ntohl(x & 0xFFFFFFFF) << 32) | _endian.ntohl(x >> 32),
# float = _endian.ntohf,
# double = _endian.ntohd,
)
_hton = dict(
int16_t=_endian.htons,
uint16_t=_endian.htons,
int32_t=_endian.htonl,
uint32_t=_endian.htonl,
int64_t=lambda x: (_endian.htonl(x & 0xFFFFFFFF) << 32) | _endian.htonl(x >> 32),
uint64_t=lambda x: (_endian.htonl(x & 0xFFFFFFFF) << 32) | _endian.htonl(x >> 32),
# float = _endian.htonf,
# double = _endian.htond,
)
[docs]class NullError(Exception):
pass
class dotdict(dict):
"""dot.notation access to dictionary attributes"""
def __getattr__(self, attr):
return self.get(attr)
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
[docs]class CFunction(object):
''' Adds some low-ish-level introspection to CFFI C functions.
Most other wrapper classes and fuctions expect API functions
to be wrapped in a CFunction. See ``wrapall()`` below.
* ``ffi``: The FFI object the C function is from.
* ``cfunc``: The C function object from CFFI.
Attributes added to instances:
* ``cfunc``: The C function object.
* ``ffi``: The FFI object the C function is from.
* ``typeof``: ffi.typeof(cfunc)
* ``cname``: From typeof.
* ``args``: From typeof.
* ``kind``: From typeof.
* ``result``: From typeof.
Callable: when called, the cfunc is called directly and it's result
is returned. See ``cmethod`` for more uses.
'''
def __init__(self, ffi, cfunc):
# This is basically a hack to work around the lack of introspection
# built-in to CFFI CData function objects. The overhead should be
# negligable since the CFFI function is directly assigned to __call__
# (this also prevents it from being called like a bound method - we do
# that later with the cmethod module function).
self.cfunc = cfunc
self.ffi = ffi
self.typeof = ffi.typeof(cfunc)
self.args = self.typeof.args
self.cname = self.typeof.cname
self.kind = self.typeof.kind
self.result = self.typeof.result
# TODO Profile to see if this is really much faster...
#self.__call__ = func
def __call__(self, *args, **kwargs):
#outargs=() retargs=None):
# Most of this code has been heavily profiled with several different
# approaches and algorithms. However, if you think of a faster/better
# way to do this, I'm open to ideas. This code should be fairly fast
# because it will be the primary interface to the underlying C library,
# potentially having wrapper functions called in tight loops.
# pypy: 1000000 loops, best of 3: 229 ns per loop
# Actually, looking at the profiler output, by far the biggest cost is
# in CFFI itself (specifically calls to the _optimize_charset function
# in the compile_sre.py module) so I don't think it's worth it to
# squeeze much more performance out of this code...
# Update: This seems to no longer be the case in newer pypy/cffi?
# TODO IDEA: Consider using some kind of format string(s) to specify
# outargs, arrays, retargs, etc? This is getting complicated enough
# that it might make things simpler for the user?
# Maybe something like "iioxiai" where 'i' is for "in" arg, 'o' for out
# 'x' for in/out. Could then maybe do computed args, like array lengths
# with something like "iiox{l5}iai" where "{l5}i" means the length of
# the 6th (0-indexed) argument. Just something to think about...
# TODO: Also, maybe this should support some way to change the position
# of the 'self' argument to allow for libraries which have inconsistent
# function signatures...
outargs = kwargs.get('outargs')
retargs = kwargs.get('retargs')
cargs = self.args
# This guard is semantically useless, but is substantially faster in
# cpython than trying to iterate over enumerate([]). (No diff in pypy)
if args:
for argi, arg in enumerate(args):
if hasattr(arg, '_cdata') and arg._cdata is not None:
args = args[:argi] + (arg._cdata,) + args[argi+1:]
elif arg is None:
args = args[:argi] + (self.ffi.NULL,) + args[argi + 1:]
elif isinstance(arg, six.text_type):
if 'wchar' in self.args[argi].cname:
arg = self.ffi.new('wchar[]', arg)
elif 'char' in self.args[argi].cname:
arg = self.ffi.new('char[]', arg.encode())
args = args[:argi] + (arg,) + args[argi + 1:]
elif isinstance(arg, six.binary_type):
if 'wchar' in self.args[argi].cname:
arg = self.ffi.new('wchar[]', arg.decode())
elif 'char' in self.args[argi].cname:
arg = self.ffi.new('char[]', arg)
args = args[:argi] + (arg,) + args[argi + 1:]
elif isinstance(arg, self.ffi.CData) and self.ffi.typeof(arg) != cargs[argi]:
if cargs[argi].kind == 'pointer' and cargs[argi].item == self.ffi.typeof(arg):
arg = self.ffi.addressof(arg)
args = args[:argi] + (arg,) + args[argi+1:]
# If this function has out or in-out pointer args, create the pointers
# for each, and insert/replace them in the argument list before passing
# to the underlying C function.
retvals = False
if outargs:
# TODO: use retargs to determine which args should be in the
# return and use -1 to indicate the actual return code. Also test
# if len(retval) == 1 and return retval_t[0].
retvals = []
# A few optimizations because looking up local variables is much
# faster than looking up object attributes.
retvals_append = retvals.append
cfunc = self.cfunc
ffi = self.ffi
for argi, inout in outargs:
argtype = cargs[argi]
if inout == 'o':
inptr = ffi.new(argtype.cname)
args = args[:argi] + (inptr,) + args[argi:]
elif inout == 'x':
if ffi.typeof(args[argi]) == argtype:
inptr = args[argi]
else:
inptr = ffi.new(argtype.cname, args[argi])
args = args[:argi] + (inptr,) + args[argi+1:]
elif inout == 'a':
inptr = self.get_arrayptr(args[argi], ctype=argtype)
args = args[:argi] + (inptr,) + args[argi+1:]
retvals_append((inptr, inout))
retval = self.cfunc(*args)
if self.result.kind == 'enum':
retval = wrapenum(retval, self.result)
elif self.result.cname == 'char *':
retval = self.ffi.string(retval)
# This is a tad slower in pypy but substantially faster in cpython than
# checkerr = kwargs.get('checkerr'); if checkerr is not None: ...
if 'checkerr' in kwargs and kwargs['checkerr'] is not None:
retval = kwargs['checkerr'](self, args, retval)
else:
retval = self.checkerr(self, args, retval)
if retvals:
retval = (retval,) # Return tuples, because it's prettier :)
for retarg, inout in retvals:
if inout == 'a':
retval += (retarg,) # Return arrays as-is
else:
# TODO: In some cases we don't want them unboxed... need a
# good way to specify when not to...
retval += (retarg[0],) # Unbox other pointers
return retval
[docs] def get_arrayptr(self, array, ctype=None):
''' Get a CFFI compatible pointer object for an array.
Supported ``array`` types are:
* numpy ndarrays: The pointer to the underlying array buffer is cast
to a CFFI pointer. Value returned from __call__ will be a pointer,
but the numpy C buffer is updated in place, so continue to use the
numpy ndarray object.
* CFFI CData pointers: If the user is already working with C arrays
(i.e., ``ffi.new("int[10]"))`` these will be returned as given.
* Python ints and longs: These will be interpretted as the length of a
newly allocated C array. The pointer to this array will be
returned. ``ctype`` must be provided (CFunction's __call__ method
does this automatically).
* Python collections: A new C array will be allocated with a length
equal to the length of the iterable (``len()`` is called and the
iterable is iterated over, so don't use exhaustable generators, etc).
``ctype`` must be provided (CFunction's __call__ method does this
automatically).
'''
if numpy and isinstance(array, numpy.ndarray):
return self.ffi.cast('void *',
array.__array_interface__['data'][0])
elif isinstance(array, self.ffi.CData):
return array
else:
# Assume it's an iterable or int/long. CFFI will handle the rest.
return self.ffi.new(self.ffi.getctype(ctype.item.cname, '[]'),
array)
[docs] def checkerr(self, cfunc, args, retval):
''' Default error checker. Checks for NULL return values and raises
NullError.
Can be overridden by subclasses. If ``_checkerr`` returns anything
other than ``None``, that value will be returned by the property or
method, otherwise original return value of the C call will be returned.
Also useful for massaging returned values.
'''
#TODO: Maybe should generalize to "returnhandler" or something?
#if self._checkerr is not None:
# self._checkerr(cfunc, args, retval)
if retval == self.ffi.NULL:
raise NullError('NULL returned by {0} with args {1}. '
.format(cfunc.cname, args))
else:
return retval
[docs]def wrap(ffi, cobj):
'''
Convenience function to wrap CFFI functions structs and unions.
'''
if (isinstance(cobj, collections.Callable)
and ffi.typeof(cobj).kind == 'function'):
cobj = CFunction(ffi, cobj)
elif isinstance(cobj, ffi.CData):
kind = ffi.typeof(cobj).kind
if kind == 'pointer':
kind = ffi.typeof(cobj).item.kind
if kind == 'struct':
cobj = CStruct(ffi, cobj)
elif kind == 'union':
cobj = CUnion(ffi, cobj)
elif isinstance(cobj, (int, long)):
pass
else:
print("Unknown: %s" % cobj)
return cobj
[docs]def wrapall(ffi, api):
'''
Convenience function to wrap CFFI functions structs and unions.
Reads functions, structs and unions from an API/Verifier object and wrap
them with the respective wrapper functions.
* ``ffi``: The FFI object (needed for it's ``typeof()`` method)
* ``api``: As returned by ``ffi.verify()``
Returns a dict mapping object names to wrapper instances. Hint: in
a python module that only does CFFI boilerplate and verification, etc, try
something like this to make the C values available directly from the module
itself::
globals().update(wrapall(myffi, myapi))
'''
# TODO: Support passing in a checkerr function to be called on the
# return value for all wrapped functions.
global _global_ffi
if _global_ffi is None:
_global_ffi = ffi
cobjs = dotdict()
for attr in dir(api):
if not attr.startswith('_'):
cobj = getattr(api, attr)
cobj = wrap(ffi, cobj)
cobjs[attr] = cobj
# The things I go through for a little bit of introspection.
# Just hope this doesn't change too much in CFFI's internals...
try:
typedef_names, names_of_structs, names_of_unions = ffi.list_types()
for ctypename in names_of_structs:
try:
cobjs[ctypename] = CStructType(ffi, ctypename)
except ffi.error as ex:
pass
for ctypename in names_of_unions:
try:
cobjs[ctypename] = CUnionType(ffi, ctypename)
except ffi.error as ex:
pass
for ctypename in typedef_names:
try:
cobjs[ctypename] = CType(ffi, ctypename)
try:
enumTypeDesc = ffi.typeof(ctypename)
enumTypeDesc = enumTypeDesc if enumTypeDesc.kind == 'enum' else enumTypeDesc.args[0] # This will only succeed for enums
for val, name in six.iteritems(enumTypeDesc.elements):
cobjs[name] = wrapenum(val, enumTypeDesc)
except AttributeError:
pass
except ffi.error as ex:
pass
except AttributeError:
try:
decls = ffi._parser._declarations
except AttributeError:
decls = {}
for _, ctype in decls.items():
if isinstance(ctype, cffi.model.StructType):
cobjs[ctype.get_c_name()] = CStructType(ffi, ctype)
elif isinstance(ctype, cffi.model.UnionType):
cobjs[ctype.get_c_name()] = CUnionType(ffi, ctype)
return cobjs
def function_skeleton(cmodule=None, outargs=(), inoutargs=(), arrays=(), retargs=None,
checkerr=None, noret=False, doc=None):
"""
This can be used as a decorator on a function stub to declare a python skeleton for a c function
eg:
@function_skeleton(cmodule=_built_cmodule, checkerr=_checkerr, noret=True, outargs=[])
def c_functtion_name(args1, arg2):
\"""
c function docstring/description.
:param type_of_arg1 arg1: arg1 does this
:param type_of_arg2 arg2: arg2 does this
:return something useful
\"""
pass
:param cmodule: api/module to get c method from
:param outargs: as per cmethod below
:param inoutargs: as per cmethod below
:param arrays: as per cmethod below
:param retargs: as per cmethod below
:param checkerr: as per cmethod below
:param noret: as per cmethod below
:param doc: as per cmethod below
"""
@wraps(cmethod)
def cmethod_wrap(func):
cfunc = getattr(cmodule, func.__name__)
return cmethod(cfunc=cfunc, outargs=outargs, inoutargs=inoutargs, arrays=arrays,
retargs=retargs, checkerr=checkerr, noret=noret, doc=doc)
return cmethod_wrap
[docs]def cmethod(cfunc, outargs=(), inoutargs=(), arrays=(), retargs=None,
checkerr=None, noret=False, doc=None):
''' Wrap cfunc to simplify handling outargs, etc.
This feature helps to simplify dealing with pointer parameters which
are meant to be "return" parameters. If any of these are specified, the
return value from the wrapper function will be a tuple containing the
actual return value from the C function followed by the values of the
pointers which were passed in. Each list should be a list of parameter
position numbers (0 for the first parameter, etc)..
* ``outargs``: These will be omitted from the cmethod-wrapped function
parameter list, and fresh pointers will be allocated (with types
derived from the C function signature) and inserted in to the
arguments list to be passed in to the C function. The pointers will
then be dereferenced and the value included in the return tuple.
* ``inoutargs``: Arguments passed to the wrapper function for these
parameters will be cast to pointers before being passed in to the C
function. Pointers will be unboxed in the return tuple.
* ``arrays``: Arguments to these parameters can be python lists or
tuples, numpy arrays or integers.
* Python lists/tuples will be copied in to newly allocated CFFI
arrays and the pointer passed in. The generated CFFI array will be
in the return tuple.
* Numpy arrays will have their data buffer pointer cast to a CFFI
pointer and passed in directly (no copying is done). The CFFI
pointer to the raw buffer will be returned, but any updates to the
array data will also be reflected in the original numpy array, so
it's recommended to just keep using that. (TODO: This behavior may
change to remove these CFFI pointers from the return tuple or maybe
replace the C array with the original numpy object.)
* Integers will indicate that a fresh CFFI array should be allocated
with a length equal to the int and initialized to zeros. The generated
CFFI array will be included in the return tuple.
* ``retargs``: (Not implemented yet.) A list of values to be returned from
the cmethod-wrapped function. Normally the returned value will be a tuple
containing the actual return value of the C function, followed by the
final value of each of the ``outargs``, ``inoutargs``, and ``arrays`` in
the order they appear in the C function's paramater list.
* ``noret``: don't return cfunc's ret. Useful when checkerr is handling this instead
* ``doc``: Optional string/object to attach to the returned function's docstring
As an example of using ``outargs`` and ``inoutargs``, a C function with
this signature::
``int cfunc(int inarg, int *outarg, float *inoutarg);``
with an ``outargs`` of ``[1]`` and ``inoutargs`` set to ``[2]`` can be
called from python as::
>>> wrapped_cfunc = cmethod(cfunc, outargs=[1], inoutargs=[2])
>>> ret, ret_outarg, ret_inoutarg = wrapped_cfunc(inarg, inoutarg)
Returned values will be unboxed python values unless otherwise documented
(i.e., arrays).
'''
# TODO: retargs...
if cfunc is None:
# TODO: There's probably something interesting to do in this case...
# maybe work like a decorator if cfunc isn't given?
return None
if not isinstance(cfunc, CFunction):
# Can't do argument introspection... TODO: raise an exception?
return cfunc
numargs = len(cfunc.args) - len(outargs)
outargs = [(i, 'o') for i in outargs]
outargs += ((i, 'x') for i in inoutargs)
outargs += ((i, 'a') for i in arrays)
outargs.sort()
@wraps(cfunc.cfunc)
def wrapper(*args):
if len(args) != numargs:
raise TypeError('wrapped Function {0} requires exactly {1} '
'arguments ({2} given)'
.format(cfunc.cname, numargs, len(args)))
if checkerr is None and hasattr(args[0], '_checkerr'):
_checkerr = args[0]._checkerr
else:
_checkerr = checkerr
retvals = cfunc(*args, outargs=outargs, retargs=retargs, checkerr=_checkerr)
if noret:
if isinstance(retvals, tuple) and len(retvals) > 1: # strip off the first return value
retvals = retvals[1:]
if len(retvals) == 1:
retvals = retvals[0]
else:
retvals = None
return retvals
if doc:
wrapper.__doc__ = doc
return wrapper
[docs]def cstaticmethod(cfunc, **kwargs):
''' Shortcut for staticmethod(cmethod(cfunc, [kwargs ...])) '''
return staticmethod(cmethod(cfunc, **kwargs))
[docs]def cproperty(fget=None, fset=None, fdel=None, doc=None, checkerr=None):
''' Shortcut to create ``cmethod`` wrapped ``property``\ s.
E.g., this:
>>> class MyCObj(CObject):
... x = property(cmethod(get_x_cfunc), cmethod(set_x_cfunc))
becomes:
>>> class MyCObj(CObject):
... x = cproperty(get_x_cfunc, set_x_cfunc)
If you need more control of the outargs/etc of the cmethods, stick to the
first form, or create and assign individual cmethods and put them in a
normal property.
'''
return property(fget=cmethod(fget, checkerr=checkerr),
fset=cmethod(fset, checkerr=checkerr),
fdel=cmethod(fdel, checkerr=checkerr),
doc=doc)
[docs]class CStruct(object):
''' Provides introspection to an instantiation of a CFFI ``StructType``s and ``UnionType``s.
Instances of this class are essentially struct/union wrappers.
Field names are easily inspected and transparent conversion of data types is done
where possible.
Struct fields can be passed in as positional arguments or keyword
arguments. ``TypeError`` is raised if positional arguments overlap with
given keyword arguments.
The module convenience function ``wrapall`` creates ``CStruct``\ s
for each instantiated struct and union imported from the FFI.
'''
def __init__(self, ffi, struct):
'''
* ``ffi``: The FFI object.
* ``structtype``: a CFFI StructType or a string for the type name
(wihtout any trailing '*' or '[]').
'''
self.__fldnames = {}
self.__pfields = {} # This is used to hold python wrappers that are linked to the underlying fields cdata
self._endian_translate = False
assert isinstance(struct, ffi.CData)
self._cdata = struct
self.__struct_type = ffi.typeof(struct)
if self.__struct_type.kind == 'pointer':
self.__struct_type = self.__struct_type.item
self._ffi = ffi
# Sometimes structtype.name starts with a '$'...?
try:
self._cname = self.__struct_type.cname
except AttributeError:
self._cname = self.__struct_type.get_c_name()
self.__fldnames = {} if self.__struct_type.fields is None else {detail[0]: detail[1].type for detail in self.__struct_type.fields}
# default formatters
# these can be overridden or removed later with set_py_converter()
for key, fieldtype in six.iteritems(self.__fldnames):
cname = fieldtype.cname
if cname.startswith('char') and ('[' in cname or '*' in cname):
self.__pfields[key] = self._ffi.string # add string output formatter
def __dir__(self):
"""
List the struct fields as well
"""
return dir(type(self)) + ([key for key in self.__fldnames.keys() if not key.startswith('_')])
def __getattr__(self, item):
attr = None
if item != '_CStruct__fldnames' and self.__fldnames and item in self.__fldnames:
attr = self.__pfields.get(item, self._cdata.__getattribute__(item))
attr = self._ntoh(item, attr)
if not isinstance(attr, self._ffi.CData) and callable(attr):
attr = attr(self._cdata.__getattribute__(item))
if isinstance(attr, self._ffi.CData):
pattr = wrap(self._ffi, attr)
if pattr is not attr:
self.__pfields[item] = pattr
attr = pattr
else:
attr = super(CStruct, self).__getattribute__(item)
attr = self._ntoh(item, attr)
return attr
def __setattr__(self, key, value):
if key != '_CStruct__fldnames' and self.__fldnames and key in self.__fldnames:
value = self._hton(key, value)
cname = self.__fldnames[key].cname
if 'char' in cname and ('[' in cname or '*' in cname):
if numpy and isinstance(value, (numpy.ndarray, nparray)):
self.__pfields[key] = value
value = nparrayptr(value)
elif isinstance(value, (bytes, str)):
self.__pfields[key] = self._ffi.string # add string output formatter
# Don't change value, setting from bytes or string are fine
elif hasattr(value, '_cdata') and value._cdata is not None:
value = value._cdata
return setattr(self._cdata, key, value)
else:
return super(CStruct, self).__setattr__(key, value)
[docs] def set_py_converter(self, key, fn=None): # TODO have converters for set as well as get?
if fn is None and key in self.__pfields:
del self.__pfields['key']
else:
self.__pfields[key] = fn
[docs] def enable_network_endian_translation(self):
global _endian
if not _endian:
load_endian_translate()
self._endian_translate = True
def _hton(self, key, val):
if self._endian_translate:
fieldtype = self.__fldnames[key].cname
val = _hton[fieldtype](val) if fieldtype in _hton else val
return val
def _ntoh(self, key, val):
if self._endian_translate:
fieldtype = self.__fldnames[key].cname
val = _ntoh[fieldtype](val) if fieldtype in _hton else val
return val
def __str__(self):
return "CStruct %s" % self._cname
def __len__(self):
return self._ffi.sizeof(self.__struct_type)
def __eq__(self, other):
return self is other or \
self._cdata == other or \
(hasattr(other, '_cdata') and self._cdata == getattr(other, '_cdata', object()))
[docs] def get_named_tuple(self):
vals = [getattr(self, field) for field in self.__fldnames]
recurse = [f.get_named_tuple() if isinstance(f, CStruct) else f for f in vals]
return namedtuple(self._cname, self.__fldnames)(*recurse)
[docs]class CStructType(object):
''' Provides introspection to CFFI ``StructType``s and ``UnionType``s.
Instances have the following attributes:
* ``ffi``: The FFI object this struct is pulled from.
* ``cname``: The C name of the struct.
* ``ptrname``: The C pointer type signature for this struct.
* ``fldnames``: A list of fields this struct has.
Instances of this class are essentially struct/union generators.
Calling an instance of ``CStructType`` will produce a newly allocated
struct or union.
Struct fields can be passed in as positional arguments or keyword
arguments. ``TypeError`` is raised if positional arguments overlap with
given keyword arguments.
Arrays of structs can be created with the ``array`` method.
The module convenience function ``wrapall`` creates ``CStructType``\ s
for each struct and union imported from the FFI.
'''
def __init__(self, ffi, structtype):
'''
* ``ffi``: The FFI object.
* ``structtype``: a CFFI StructType or a string for the type name
(wihtout any trailing '*' or '[]').
'''
self.fldnames = None
self._cdata = None
if isinstance(structtype, str):
try:
self.__struct_type = ffi.typeof(structtype.lstrip('_'))
except AttributeError:
self.__struct_type = ffi._parser.parse_type(structtype)
elif isinstance(structtype, ffi.CType):
self.__struct_type = structtype
else:
raise NotImplementedError("Don't know how to handle structtype of %s" % type(structtype))
if self.__struct_type.kind == 'pointer':
self.__struct_type = self.__struct_type.item
self.ffi = ffi
# Sometimes structtype.name starts with a '$'...?
try:
self.cname = self.__struct_type.cname
except AttributeError:
self.cname = self.__struct_type.get_c_name()
self.ptrname = ffi.getctype(self.cname, '*')
try:
self.fldnames = None if self.__struct_type.fields is None else [detail[0] for detail in self.__struct_type.fields]
except AttributeError:
self.fldnames = self.__struct_type.fldnames
def __call__(self, *args, **kwargs):
if self.fldnames is None:
if args or kwargs:
raise TypeError('CStructType call with arguments on opaque '
'CFFI struct {0}.'.format(self.cname))
return self.ffi.new(self.ptrname)
else:
if len(args) > len(self.fldnames):
raise TypeError('CStructType got more arguments than struct '
'has fields. {0} > {1}'
.format(len(args), len(self.fldnames)))
retval = self.ffi.new(self.ptrname)
for fld, val in zip(self.fldnames, args):
if fld in kwargs:
raise TypeError('CStructType call got multiple values for '
'field name {0}'.format(fld))
setattr(retval, fld, val)
for fld, val in kwargs.items():
setattr(retval, fld, val)
return wrap(self.ffi, retval)
[docs] def array(self, shape):
''' Constructs a C array of the struct type with the given length.
* ``shape``: Either an int for the length of a 1-D array, or a tuple
for the length of each of len dimensions. I.e., [2,2] for a 2-D array
with length 2 in each dimension. Hint: If you want an array of
pointers just add an extra demension with length 1. I.e., [2,2,1] is
a 2x2 array of pointers to structs.
No explicit initialization of the elements is performed, however CFFI
itself automatically initializes newly allocated memory to zeros.
'''
# TODO: Factor out and integrate with carray function below?
if isinstance(shape, collections.Iterable):
suffix = ('[%i]' * len(shape)) % tuple(shape)
else:
suffix = '[%i]' % (shape,)
# TODO Allow passing initialization args? Maybe factor out some of the
# code in CStructType.__call__?
return self.ffi.new(self.ffi.getctype(self.cname + suffix))
[docs]class CUnion(CStruct):
def __init__(self, ffi, uniontype):
super(CUnion, self).__init__(ffi, uniontype)
[docs]class CUnionType(CStructType):
def __init__(self, ffi, uniontype):
super(CUnionType, self).__init__(ffi, uniontype)
class CType(object):
def __init__(self, ffi, typedef):
self.typedef = typedef
self.ffi = ffi
self.ctype = None
self._cdata = None
try:
desc = ffi.typeof(typedef + '*').item
if desc.kind == 'struct':
self.ctype = CStructType(ffi, desc)
elif desc.kind == 'union':
self.ctype = CUnionType(ffi, desc)
except Exception as ex:
print(ex)
def __repr__(self):
return ("type: %s" % self.typedef) if not self._cdata else \
("%s <%s" % (self.typedef, repr(self._cdata).split(' ')[-1]))
def __call__(self, *args, **kwargs):
if self.ctype is None:
raise TypeError("'%s' object is not callable", self.typedef)
return self.ctype(*args, **kwargs)
def cast(self, cobj):
if self.ffi.typeof(cobj) != self.ffi.typeof(self.typedef):
cobj = self.ffi.cast(self.typedef, cobj)
wrapped = CType(self.ffi, self.typedef)
setattr(wrapped, '_cdata', cobj)
return wrapped
class Enum(long if six.PY2 else int):
"""
This is a base class for wrapping enum ints
wrapenum() below will subtype it for a particular enum
and return a wrapped result which will still work as an int
but display/print as the string representation from the enum
"""
_names = {}
def __new__(cls, *args, **kwargs):
return super(Enum, cls).__new__(cls, *args, **kwargs)
def __str__(self):
return self._names.get(int(self), str(int(self)))
# Cache generated enum types
_enumTypes = {}
[docs]def wrapenum(retval, enumTypeDescr):
"""
Wraps enum int in an auto-generated wrapper class. This is used automatically when
cmethod() returns an enum type
:param retval: integer
:param enumTypeDescr or CType: the cTypeDescr for the enum
:return: subclass of Enum
"""
def _newEnumType(enumTypeDescr):
_enumTypes[enumTypeDescr.cname] = type(enumTypeDescr.cname, (Enum, ), {"_names": enumTypeDescr.elements})
return _enumTypes[enumTypeDescr.cname]
if isinstance(enumTypeDescr, CType):
enumTypeDescr = enumTypeDescr.ffi.typeof(enumTypeDescr.typedef)
enum = _enumTypes.get(enumTypeDescr.cname, _newEnumType(enumTypeDescr))
return enum(retval)
[docs]class CObject(object):
''' A pythonic representation of a C "object"
Usually representing a set of C functions that operate over a common peice
of data. Many C APIs have lots of functions which accept some common struct
pointer or identifier int as the first argument being manipulated. CObject
provides a convenient abstrtaction to making this convention more "object
oriented". See the example below. More examples can be found in the
cfficloak unit tests.
Use ``cproperty`` and ``cmethod`` to wrap CFFI C functions to behave like
instance methods, passing the instance in as the first argument. See the
doc strings for each above.
For C types which are not automatically coerced/converted by CFFI (such as
C functions accepting struct pointers, etc) the subclass can set a class-
or instance-attribute named ``_cdata`` which will be passed to the CFFI
functions instead of ``self``. The CObject can also have a ``_cnew`` static
method (see ``cstaticmethod``) which will be called by the base class's
``__init__`` and the returned value assigned to the instance's ``_cdata``.
For example:
libexample.h::
typedef int point_t;
point_t make_point(int x, int y);
int point_x(point_t p);
int point_y(point_t p);
int point_setx(point_t p, int x);
int point_sety(point_t p, int y);
int point_move(point_t p, int x, int y);
int point_x_abs(point_t p);
int point_movex(point_t p, int x);
Python usage (where ``libexample`` is an API object from
``ffi.verify()``)::
>>> from cfficloak import CObject, cproperty, cmethod, cstaticmethod
>>> class Point(CObject):
... x = cproperty(libexample.point_x, libexample.point_setx)
... y = cproperty(libexample.point_y, libexample.point_sety)
... _cnew = cstaticmethod(libexample.make_point)
...
>>> p = Point(4, 2)
>>> p.x
4
>>> p.x = 8
>>> p.x
8
>>> p.y
2
You can also specify a destructor with a ``_cdel`` method in the same way
as ``_cnew``.
Alternatively you can assign a CFFI compatible object (either an actual
CFFI CData object, or something CFFI automatically converts like and int)
to the instance's _cdata attribute.
``cmethod`` wraps a CFunction to provide an easy way to handle 'output'
pointer arguments, arrays, etc. (See the ``cmethod`` documentation.)::
>>> class Point2(Point):
... move = cmethod(libexample.point_move)
...
>>> p2 = Point2(8, 2)
>>> p2.move(2, 2)
0
>>> p2.x
10
>>> p2.y
4
If _cdata is set, attributes of the cdata object can also be retrieved from
the CObject instance, e.g., for struct fields, etc.
libexample cdef::
typedef struct { int x; int y; ...; } mystruct;
mystruct* make_mystruct(int x, int y);
int mystruct_x(mystruct* ms);
python::
>>> class MyStruct(CObject):
... x = cproperty(libexample.mystruct_x)
... _cnew = cstaticmethod(libexample.make_mystruct)
...
>>> ms = MyStruct(4, 2)
>>> ms.x # Call to mystruct_x via cproperty
4
>>> ms.y # direct struct field access
2
Note: stack-passed structs are not supported yet* but pointers to
structs work as expected if you set the ``_cdata`` attribute to the
pointer.
* https://bitbucket.org/cffi/cffi/issue/102
'''
def __init__(self, *args, **kwargs):
if not hasattr(self, '_cdata') or self._cdata is None:
if hasattr(self, '_cnew'):
# C functions don't accept kwargs, so we just ignore them.
self._cdata = self._cnew(*args)
else:
self._cdata = None
def __getattr__(self, attr):
if self._cdata is not None and hasattr(self._cdata, attr):
return getattr(self._cdata, attr)
else:
raise AttributeError("{0} object has no attribute {1}"
.format(repr(self.__class__), repr(attr)))
def __del__(self):
if hasattr(self, '_cdel'):
self._cdel()
class nparray(object):
"""
For use with cffi arrays, return a numpy reference to them that also holds
a reference to the c data to ensure it stays alive
:param cffi.CData cdata: array object, expected to be uint8_t or equivalent
:return: wrapped numpy array object
"""
def __init__(self, _cdata, size=-1, dtype=None):
if not numpy:
raise NotImplementedError("numpy needs to be installed to use nparray()")
self.__cdata = _cdata
self.__buff = _global_ffi.buffer(_cdata, size=size)
self.__nparray = None
dtype = dtype or numpy.uint8
self.__nparray = numpy.frombuffer(self.__buff, dtype=dtype)
def __getattr__(self, item):
return getattr(self.__nparray, item)
def __getitem__(self, item):
return self.__nparray[item]
def __repr__(self):
return repr(self.__nparray)
def __len__(self):
return len(self.__nparray)
@property
def ndarray(self):
return self.__nparray
[docs]def nparrayptr(nparr, offset=0):
''' Convenience function for getting the CFFI-compatible pointer to a numpy
array object. '''
if _global_ffi:
return _global_ffi.cast('void *', nparr.__array_interface__['data'][0]+offset)
[docs]def carray(items_or_size=None, size=None, ctype='int'):
''' Convenience function for creating C arrays. '''
# TODO: Support multi-dimensional arrays? Maybe it's just easier to stick
# with numpy...
if _global_ffi:
if isinstance(items_or_size, int) and size is None:
size = items_or_size
items = None
else:
items = items_or_size
if items and size > len(items):
size = max(len(items), size or 0)
arr = _global_ffi.new(_global_ffi.getctype(ctype, '[]'), size)
for i, elem in enumerate(items):
arr[i] = elem
return arr
else:
return _global_ffi.new(_global_ffi.getctype(ctype, '[]'), items or size)