#!/usr/bin/env python3 ##### # # aspy.py # # Andy Hammerlindl 2011/09/03 # # Uses ctypes to interface with the shared library version of Python. # Asymptote can run and its datatypes inspected via Python. # # # To use the module: # 1. make asymptote.so # 2. Ensure that asymptote.so is visable to python, say adding its directory # to LD_LIBRARY_PATH # 3. Run this module. (See runExample for an example.) # ##### from ctypes import * asyInt = c_longlong handle_typ = c_void_p arguments_typ = c_void_p state_typ = c_void_p function_typ = CFUNCTYPE(None, state_typ, c_void_p) class string_typ(Structure): _fields_ = [ ("buf", c_char_p), # Should be NUL-terminated? Maybe replace with # POINTER(c_char). ("length", asyInt) ] ErrorCallbackFUNC = CFUNCTYPE(None, string_typ) NORMAL_ARG = 45000 REST_ARG = 45001 class Policy(Structure): _fields_ = [ ("version", asyInt), ("copyHandle", CFUNCTYPE(handle_typ, handle_typ)), ("releaseHandle", CFUNCTYPE(None, handle_typ)), ("handleFromInt", CFUNCTYPE(handle_typ, asyInt)), ("handleFromBool", CFUNCTYPE(handle_typ, asyInt)), ("handleFromDouble", CFUNCTYPE(handle_typ, c_double)), ("handleFromString", CFUNCTYPE(handle_typ, string_typ)), ("handleFromFunction", CFUNCTYPE(handle_typ, c_char_p, function_typ, c_void_p)), ("IntFromHandle", CFUNCTYPE(asyInt, handle_typ)), ("boolFromHandle", CFUNCTYPE(asyInt, handle_typ)), ("doubleFromHandle", CFUNCTYPE(c_double, handle_typ)), ("stringFromHandle", CFUNCTYPE(string_typ, handle_typ)), ("getField", CFUNCTYPE(handle_typ, handle_typ, c_char_p)), ("getCell", CFUNCTYPE(handle_typ, handle_typ, asyInt)), ("addField", CFUNCTYPE(None, handle_typ, c_char_p, handle_typ)), ("newArguments", CFUNCTYPE(arguments_typ)), ("releaseArguments", CFUNCTYPE(None, arguments_typ)), ("addArgument", CFUNCTYPE(None, arguments_typ, c_char_p, handle_typ, asyInt)), ("call", CFUNCTYPE(handle_typ, handle_typ, arguments_typ)), ("globals", CFUNCTYPE(handle_typ, state_typ)), ("numParams", CFUNCTYPE(asyInt, state_typ)), ("getParam", CFUNCTYPE(handle_typ, state_typ, asyInt)), ("setReturnValue", CFUNCTYPE(None, state_typ, handle_typ)), ("setErrorCallback", CFUNCTYPE(None, ErrorCallbackFUNC)), ] policy = None baseState = None def initPolicyAndBaseState(): global policy, baseState lib = CDLL("asymptote.so") getPolicy = lib._asy_getPolicy getPolicy.restype = POINTER(Policy) policy = getPolicy() getState = lib._asy_getState getState.restype = state_typ baseState = getState() initPolicyAndBaseState() def pyStringFromAsyString(st): #TODO: Handle strings with null-terminators. return str(st.buf) def pyStringFromHandle(h): #TODO: Handle strings with null-terminators. st = policy.contents.stringFromHandle(h) checkForErrors() return pyStringFromAsyString(st) def handleFromPyString(s): st = string_typ(s, len(s)) h = policy.contents.handleFromString(st) checkForErrors() return h def ensureDatum(val): return val if type(val) is Datum else Datum(val) # The error detection scheme. # policyError is set to a string when an error occurs. policyError = [] def pyErrorCallback(s): global policyError policyError.append(pyStringFromAsyString(s)) cErrorCallback = ErrorCallbackFUNC(pyErrorCallback) policy.contents.setErrorCallback(cErrorCallback) class AsyException(Exception): def __init__(self, msg): self.msg = msg def __str__(self): return self.msg def checkForErrors(): """Raises an exception if an error occured.""" global policyError if policyError != []: s = policyError[0] if len(policyError) > 1: s += ' (and other errors)' policyError = [] raise AsyException(s) class Datum(object): def _setHandle(self, handle): object.__setattr__(self, 'handle', handle) def __init__(self, val): self._setHandle(0) if val is None: return if type(val) is int: self._setHandle(policy.contents.handleFromInt(val)) checkForErrors() elif type(val) is bool: self._setHandle(policy.contents.handleFromBool(1 if val else 0)) checkForErrors() elif type(val) is float: self._setHandle(policy.contents.handleFromDouble(val)) elif type(val) is str: self._setHandle(handleFromPyString(val)) checkForErrors() elif type(val) is tuple: # Could do this more efficiently, and avoid a copyHandle ret = state.globals()["operator tuple"](*val) self._setHandle(policy.contents.copyHandle(ret.handle)) checkForErrors() elif type(val) is Datum: self._setHandle(policy.contents.copyHandle(val.handle)) checkForErrors() else: # TODO: check if val has a toDatum field raise TypeError("cannot initialize Datum from '%s'" % type(val).__name__) def __repr__(self): # TODO: Add type-checking to policy. return '' % hex(self.handle) def __int__(self): l = policy.contents.IntFromHandle(self.handle) checkForErrors() return int(l) def __nonzero__(self): # This will throw an exception for anything but an underlying bool # type. Perhaps we should be more pythonic. l = policy.contents.boolFromHandle(self.handle) checkForErrors() assert l in [0,1] return l == 1 def __float__(self): x = policy.contents.doubleFromHandle(self.handle) checkForErrors() return float(x) def __str__(self): return pyStringFromHandle(self.handle) def __getattr__(self, name): field = policy.contents.getField(self.handle, name) checkForErrors() return DatumFromHandle(field) def __getitem__(self, name): assert type(name) == str return self.__getattr__(name) #TODO: raise an IndexError when appropriate. #TODO: implement array indices def __setattr__(self, name, val): # TODO: Resolve setting versus declaring. # One idea: d.x = f or d["x"] = f sets and d["int x()"] = f declares # anew. policy.contents.addField(self.handle, name, ensureDatum(val).handle) checkForErrors() def __setitem__(self, name, val): assert type(name) == str self.__setattr__(name, val) #TODO: raise an IndexError when appropriate. #TODO: implement array indices def __call__(self, *args, **namedArgs): alist = policy.contents.newArguments() checkForErrors() for arg in args: d = ensureDatum(arg) policy.contents.addArgument(alist, "", d.handle, NORMAL_ARG) checkForErrors() for name,arg in namedArgs.items(): d = ensureDatum(arg) policy.contents.addArgument(alist, name, d.handle, NORMAL_ARG) checkForErrors() ret = policy.contents.call(self.handle, alist) checkForErrors() policy.contents.releaseArguments(alist) checkForErrors() if ret != None: return DatumFromHandle(ret) def __add__(self, other): return state.globals()["operator +"](self, other) def __sub__(self, other): return state.globals()["operator -"](self, other) def __mul__(self, other): return state.globals()["operator *"](self, other) def __div__(self, other): return state.globals()["operator /"](self, other) def __truediv__(self, other): return state.globals()["operator /"](self, other) def __mod__(self, other): return state.globals()["operator %"](self, other) def __pow__(self, other): return state.globals()["operator ^"](self, other) def __and__(self, other): return state.globals()["operator &"](self, other) def __or__(self, other): return state.globals()["operator |"](self, other) def __neg__(self, other): return state.globals()["operator -"](self) def __lt__(self, other): return state.globals()["operator <"](self, other) def __le__(self, other): return state.globals()["operator <="](self, other) def __eq__(self, other): return state.globals()["operator =="](self, other) def __ne__(self, other): return state.globals()["operator !="](self, other) def __gt__(self, other): return state.globals()["operator >"](self, other) def __ge__(self, other): return state.globals()["operator >="](self, other) def DatumFromHandle(handle): """Initializes a Datum from a given low-level handle. Does not invoke copyHandle.""" d = Datum(None) d._setHandle(handle) return d class State(object): def __init__(self, base): self.base = base def globals(self): handle = policy.contents.globals(self.base) checkForErrors() return DatumFromHandle(handle) def params(self): p = [] numParams = policy.contents.numParams(self.base) checkForErrors() for i in range(numParams): h = policy.contents.getParam(self.base, i) checkForErrors() p.append(DatumFromHandle(h)) assert len(p) == numParams return p def setReturnValue(self, val): policy.contents.setReturnValue(self.base, ensureDatum(val).handle) checkForErrors() # Keep a link to all of the callbacks created, so they aren't garbage # collected. TODO: See if this is neccessary. storedCallbacks = [] def DatumFromCallable(f): def wrapped(s, d): state = State(s) params = state.params() r = f(*params) if r != None: state.setReturnValue(r) cf = function_typ(wrapped) storedCallbacks.append(cf) h = policy.contents.handleFromFunction(f.__name__, cf, None) checkForErrors() return DatumFromHandle(h) print ("version", policy.contents.version) state = State(baseState) # An example def runExample(): g = state.globals() g.eval("path p = (0,0) -- (100,100) -- (200,0)", embedded=True) g.draw(g.p) g.shipout("frompython") g.draw(g.circle(100), g.red)