source: trunk/TouchOSC_LC/OSC.py @ 11

Revision 11, 10.0 KB checked in by st8, 13 months ago (diff)

Added prelim TouchOSC script

Line 
1#!/usr/bin/python
2#
3# Open SoundControl for Python
4# Copyright (C) 2002 Daniel Holth, Clinton McChesney
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this library; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19#
20# For questions regarding this module contact
21# Daniel Holth <dholth@stetson.edu> or visit
22# http://www.stetson.edu/~ProctoLogic/
23#
24# Changelog:
25# 15 Nov. 2001:
26#   Removed dependency on Python 2.0 features.
27#   - dwh
28# 13 Feb. 2002:
29#   Added a generic callback handler.
30#   - dwh
31
32import sys
33import struct
34import math
35import string
36
37def hexDump(bytes):
38    """Useful utility; prints the string in hexadecimal"""
39    for i in range(len(bytes)):
40        sys.stdout.write("%2x " % (ord(bytes[i])))
41        if (i+1) % 8 == 0:
42            print repr(bytes[i-7:i+1])
43
44    if(len(bytes) % 8 != 0):
45        print string.rjust("", 11), repr(bytes[i-7:i+1])
46
47
48class OSCMessage:
49    """Builds typetagged OSC messages."""
50    def __init__(self):
51        self.address  = ""
52        self.typetags = ","
53        self.message  = ""
54
55    def setAddress(self, address):
56        self.address = address
57
58    def setMessage(self, message):
59        self.message = message
60
61    def setTypetags(self, typetags):
62        self.typetags = typetags
63
64    def clear(self):
65        self.address  = ""
66        self.clearData()
67
68    def clearData(self):
69        self.typetags = ","
70        self.message  = ""
71
72    def append(self, argument, typehint = None):
73        """Appends data to the message,
74        updating the typetags based on
75        the argument's type.
76        If the argument is a blob (counted string)
77        pass in 'b' as typehint."""
78
79        if typehint == 'b':
80            binary = OSCBlob(argument)
81        else:
82            binary = OSCArgument(argument)
83
84        self.typetags = self.typetags + binary[0]
85        self.rawAppend(binary[1])
86
87    def rawAppend(self, data):
88        """Appends raw data to the message.  Use append()."""
89        self.message = self.message + data
90
91    def getBinary(self):
92        """Returns the binary message (so far) with typetags."""
93        address  = OSCArgument(self.address)[1]
94        typetags = OSCArgument(self.typetags)[1]
95        return address + typetags + self.message
96
97    def __repr__(self):
98        return self.getBinary()
99
100def readString(data):
101    length   = string.find(data,"\0")
102    nextData = int(math.ceil((length+1) / 4.0) * 4)
103    return (data[0:length], data[nextData:])
104
105
106def readBlob(data):
107    length   = struct.unpack(">i", data[0:4])[0]   
108    nextData = int(math.ceil((length) / 4.0) * 4) + 4   
109    return (data[4:length+4], data[nextData:])
110
111
112def readInt(data):
113    if(len(data)<4):
114        print "Error: too few bytes for int", data, len(data)
115        rest = data
116        integer = 0
117    else:
118        integer = struct.unpack(">i", data[0:4])[0]
119        rest    = data[4:]
120       
121    return (integer, rest)
122
123
124
125def readLong(data):
126    """Tries to interpret the next 8 bytes of the data
127    as a 64-bit signed integer."""
128    high, low = struct.unpack(">ll", data[0:8])
129    big = (long(high) << 32) + low
130    rest = data[8:]
131    return (big, rest)
132
133
134
135def readFloat(data):
136    if(len(data)<4):
137        print "Error: too few bytes for float", data, len(data)
138        rest = data
139        float = 0
140    else:
141        float = struct.unpack(">f", data[0:4])[0]
142        rest  = data[4:]
143
144    return (float, rest)
145
146
147def OSCBlob(next):
148    """Convert a string into an OSC Blob,
149    returning a (typetag, data) tuple."""
150
151    if type(next) == type(""):
152        length = len(next)
153        padded = math.ceil((len(next)) / 4.0) * 4
154        binary = struct.pack(">i%ds" % (padded), length, next)
155        tag    = 'b'
156    else:
157        tag    = ''
158        binary = ''
159   
160    return (tag, binary)
161
162
163def OSCArgument(next):
164    """Convert some Python types to their
165    OSC binary representations, returning a
166    (typetag, data) tuple."""
167   
168    if type(next) == type(""):       
169        OSCstringLength = math.ceil((len(next)+1) / 4.0) * 4
170        binary  = struct.pack(">%ds" % (OSCstringLength), next)
171        tag = "s"
172    elif type(next) == type(42.5):
173        binary  = struct.pack(">f", next)
174        tag = "f"
175    elif type(next) == type(13):
176        binary  = struct.pack(">i", next)
177        tag = "i"
178    else:
179        binary  = ""
180        tag = ""
181
182    return (tag, binary)
183
184
185def parseArgs(args):
186    """Given a list of strings, produces a list
187    where those strings have been parsed (where
188    possible) as floats or integers."""
189    parsed = []
190    for arg in args:
191        print arg
192        arg = arg.strip()
193        interpretation = None
194        try:
195            interpretation = float(arg)
196            if string.find(arg, ".") == -1:
197                interpretation = int(interpretation)
198        except:
199            # Oh - it was a string.
200            interpretation = arg
201            pass
202        parsed.append(interpretation)
203    return parsed
204
205
206
207def decodeOSC(data):
208    """Converts a typetagged OSC message to a Python list."""
209    table = {"i":readInt, "f":readFloat, "s":readString, "b":readBlob}
210    decoded = []
211    address,  rest = readString(data)
212    typetags = ""
213
214    if address == "#bundle":
215        time, rest = readLong(rest)
216        decoded.append(address)
217        decoded.append(time)
218        while len(rest)>0:
219            length, rest = readInt(rest)
220            decoded.append(decodeOSC(rest[:length]))
221            rest = rest[length:]
222
223    elif len(rest)>0:
224        typetags, rest = readString(rest)
225        decoded.append(address)
226        decoded.append(typetags)
227        if(typetags[0] == ","):
228            for tag in typetags[1:]:
229                value, rest = table[tag](rest)               
230                decoded.append(value)
231        else:
232            print "Oops, typetag lacks the magic ,"
233
234    # return only the data
235    return decoded
236
237
238class CallbackManager:
239    """This utility class maps OSC addresses to callables.
240
241    The CallbackManager calls its callbacks with a list
242    of decoded OSC arguments, including the address and
243    the typetags as the first two arguments."""
244
245    def __init__(self):
246        self.callbacks = {}
247        self.add(self.unbundler, "#bundle")
248
249    def handle(self, data, source = None):
250        """Given OSC data, tries to call the callback with the
251        right address."""
252        decoded = decodeOSC(data)
253        self.dispatch(decoded)
254
255    def dispatch(self, message):
256        """Sends decoded OSC data to an appropriate calback"""
257        try:
258            address = message[0]
259            self.callbacks[address](message)
260        except KeyError, e:
261            print "key not found"
262            # address not found
263            pass
264        except None, e:
265            print "Exception in", address, "callback :", e
266       
267        return
268
269    def add(self, callback, name):
270        """Adds a callback to our set of callbacks,
271        or removes the callback with name if callback
272        is None."""
273        if callback == None:
274            del self.callbacks[name]
275        else:
276            self.callbacks[name] = callback
277
278    def unbundler(self, messages):
279        """Dispatch the messages in a decoded bundle."""
280        # first two elements are #bundle and the time tag, rest are messages.
281        for message in messages[2:]:
282            self.dispatch(message)
283
284
285if __name__ == "__main__":
286    hexDump("Welcome to the OSC testing program.")
287    print
288    message = OSCMessage()
289    message.setAddress("/foo/play")
290    message.append(44)
291    message.append(11)
292    message.append(4.5)
293    message.append("the white cliffs of dover")
294    hexDump(message.getBinary())
295
296    print "Making and unmaking a message.."
297
298    strings = OSCMessage()
299    strings.append("Mary had a little lamb")
300    strings.append("its fleece was white as snow")
301    strings.append("and everywhere that Mary went,")
302    strings.append("the lamb was sure to go.")
303    strings.append(14.5)
304    strings.append(14.5)
305    strings.append(-400)
306
307    raw  = strings.getBinary()
308
309    hexDump(raw)
310   
311    print "Retrieving arguments..."
312    data = raw
313    for i in range(6):
314        text, data = readString(data)
315        print text
316
317    number, data = readFloat(data)
318    print number
319
320    number, data = readFloat(data)
321    print number
322
323    number, data = readInt(data)
324    print number
325
326    hexDump(raw)
327    print decodeOSC(raw)
328    print decodeOSC(message.getBinary())
329
330    print "Testing Blob types."
331   
332    blob = OSCMessage() 
333    blob.append("","b")
334    blob.append("b","b")
335    blob.append("bl","b")
336    blob.append("blo","b")
337    blob.append("blob","b")
338    blob.append("blobs","b")
339    blob.append(42)
340
341    hexDump(blob.getBinary())
342
343    print decodeOSC(blob.getBinary())
344
345    def printingCallback(stuff):
346        sys.stdout.write("Got: ")
347        for i in stuff:
348            sys.stdout.write(str(i) + " ")
349        sys.stdout.write("\n")
350
351    print "Testing the callback manager."
352   
353    c = CallbackManager()
354    c.add(printingCallback, "/print")
355   
356    c.handle(message.getBinary())
357    message.setAddress("/print")
358    c.handle(message.getBinary())
359   
360    print1 = OSCMessage()
361    print1.setAddress("/print")
362    print1.append("Hey man, that's cool.")
363    print1.append(42)
364    print1.append(3.1415926)
365
366    c.handle(print1.getBinary())
367
368    bundle = OSCMessage()
369    bundle.setAddress("")
370    bundle.append("#bundle")
371    bundle.append(0)
372    bundle.append(0)
373    bundle.append(print1.getBinary(), 'b')
374    bundle.append(print1.getBinary(), 'b')
375
376    bundlebinary = bundle.message
377
378    print "sending a bundle to the callback manager"
379    c.handle(bundlebinary)
Note: See TracBrowser for help on using the repository browser.