test_pythonlib: Fix bug in traversing directores

walker.py: imports; Add test Python2.5 bytecode - it works!
Makefile: remove temporary directories and _dis files which were added by mistake
This commit is contained in:
rocky 2015-12-13 17:55:05 -05:00
parent 501060f87f
commit 14815b3d8c
45 changed files with 28 additions and 2136 deletions

View File

@ -12,10 +12,17 @@ check: check-short check-2.7-ok
#: Run quick tests
check-short:
$(PYTHON) test_pythonlib.py --base-2.7 --verify $(COMPILE)
$(PYTHON) test_pythonlib.py --bytecode-2.5
#: Run longer Python 2.7's lib files known to be okay
check-2.7-ok:
$(PYTHON) test_pythonlib.py --ok-2.7 --verify $(COMPILE)
clean:
clean: clean-py-dis clean-dis
clean-dis:
find . -name '*_dis' -exec rm -v '{}' ';'
#: Clean temporary compile/decompile/verify direcotries in /tmp
clean-py-dis:
rm -fr /tmp/py-dis-* || true

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,932 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/aifc.py
"""Stuff to parse AIFF-C and AIFF files.
Unless explicitly stated otherwise, the description below is true
both for AIFF-C files and AIFF files.
An AIFF-C file has the following structure.
+-----------------+
| FORM |
+-----------------+
| <size> |
+----+------------+
| | AIFC |
| +------------+
| | <chunks> |
| | . |
| | . |
| | . |
+----+------------+
An AIFF file has the string "AIFF" instead of "AIFC".
A chunk consists of an identifier (4 bytes) followed by a size (4 bytes,
big endian order), followed by the data. The size field does not include
the size of the 8 byte header.
The following chunk types are recognized.
FVER
<version number of AIFF-C defining document> (AIFF-C only).
MARK
<# of markers> (2 bytes)
list of markers:
<marker ID> (2 bytes, must be > 0)
<position> (4 bytes)
<marker name> ("pstring")
COMM
<# of channels> (2 bytes)
<# of sound frames> (4 bytes)
<size of the samples> (2 bytes)
<sampling frequency> (10 bytes, IEEE 80-bit extended
floating point)
in AIFF-C files only:
<compression type> (4 bytes)
<human-readable version of compression type> ("pstring")
SSND
<offset> (4 bytes, not used by this program)
<blocksize> (4 bytes, not used by this program)
<sound data>
A pstring consists of 1 byte length, a string of characters, and 0 or 1
byte pad to make the total length even.
Usage.
Reading AIFF files:
f = aifc.open(file, 'r')
where file is either the name of a file or an open file pointer.
The open file pointer must have methods read(), seek(), and close().
In some types of audio files, if the setpos() method is not used,
the seek() method is not necessary.
This returns an instance of a class with the following public methods:
getnchannels() -- returns number of audio channels (1 for
mono, 2 for stereo)
getsampwidth() -- returns sample width in bytes
getframerate() -- returns sampling frequency
getnframes() -- returns number of audio frames
getcomptype() -- returns compression type ('NONE' for AIFF files)
getcompname() -- returns human-readable version of
compression type ('not compressed' for AIFF files)
getparams() -- returns a tuple consisting of all of the
above in the above order
getmarkers() -- get the list of marks in the audio file or None
if there are no marks
getmark(id) -- get mark with the specified id (raises an error
if the mark does not exist)
readframes(n) -- returns at most n frames of audio
rewind() -- rewind to the beginning of the audio stream
setpos(pos) -- seek to the specified position
tell() -- return the current position
close() -- close the instance (make it unusable)
The position returned by tell(), the position given to setpos() and
the position of marks are all compatible and have nothing to do with
the actual position in the file.
The close() method is called automatically when the class instance
is destroyed.
Writing AIFF files:
f = aifc.open(file, 'w')
where file is either the name of a file or an open file pointer.
The open file pointer must have methods write(), tell(), seek(), and
close().
This returns an instance of a class with the following public methods:
aiff() -- create an AIFF file (AIFF-C default)
aifc() -- create an AIFF-C file
setnchannels(n) -- set the number of channels
setsampwidth(n) -- set the sample width
setframerate(n) -- set the frame rate
setnframes(n) -- set the number of frames
setcomptype(type, name)
-- set the compression type and the
human-readable compression type
setparams(tuple)
-- set all parameters at once
setmark(id, pos, name)
-- add specified mark to the list of marks
tell() -- return current position in output file (useful
in combination with setmark())
writeframesraw(data)
-- write audio frames without pathing up the
file header
writeframes(data)
-- write audio frames and patch up the file header
close() -- patch up the file header and close the
output file
You should set the parameters before the first writeframesraw or
writeframes. The total number of frames does not need to be set,
but when it is set to the correct value, the header does not have to
be patched up.
It is best to first set all parameters, perhaps possibly the
compression type, and then write audio frames using writeframesraw.
When all frames have been written, either call writeframes('') or
close() to patch up the sizes in the header.
Marks can be added anytime. If there are any marks, you must call
close() after all frames have been written.
The close() method is called automatically when the class instance
is destroyed.
When a file is opened with the extension '.aiff', an AIFF file is
written, otherwise an AIFF-C file is written. This default can be
changed by calling aiff() or aifc() before the first writeframes or
writeframesraw.
"""
import struct
import __builtin__
__all__ = ['Error', 'open', 'openfp']
class Error(Exception):
pass
_AIFC_version = 2726318400L
def _read_long(file):
try:
return struct.unpack('>l', file.read(4))[0]
except struct.error:
raise EOFError
def _read_ulong(file):
try:
return struct.unpack('>L', file.read(4))[0]
except struct.error:
raise EOFError
def _read_short(file):
try:
return struct.unpack('>h', file.read(2))[0]
except struct.error:
raise EOFError
def _read_ushort(file):
try:
return struct.unpack('>H', file.read(2))[0]
except struct.error:
raise EOFError
def _read_string(file):
length = ord(file.read(1))
if length == 0:
data = ''
else:
data = file.read(length)
if length & 1 == 0:
dummy = file.read(1)
return data
_HUGE_VAL = 1.79769313486231e+308
def _read_float(f):
expon = _read_short(f)
sign = 1
if expon < 0:
sign = -1
expon = expon + 32768
himant = _read_ulong(f)
lomant = _read_ulong(f)
if expon == himant == lomant == 0:
f = 0.0
elif expon == 32767:
f = _HUGE_VAL
else:
expon = expon - 16383
f = (himant * 4294967296L + lomant) * pow(2.0, expon - 63)
return sign * f
def _write_short(f, x):
f.write(struct.pack('>h', x))
def _write_ushort(f, x):
f.write(struct.pack('>H', x))
def _write_long(f, x):
f.write(struct.pack('>l', x))
def _write_ulong(f, x):
f.write(struct.pack('>L', x))
def _write_string(f, s):
if len(s) > 255:
raise ValueError('string exceeds maximum pstring length')
f.write(struct.pack('B', len(s)))
f.write(s)
if len(s) & 1 == 0:
f.write(chr(0))
def _write_float(f, x):
import math
if x < 0:
sign = 32768
x = x * -1
else:
sign = 0
if x == 0:
expon = 0
himant = 0
lomant = 0
else:
fmant, expon = math.frexp(x)
if expon > 16384 or fmant >= 1 or fmant != fmant:
expon = sign | 32767
himant = 0
lomant = 0
else:
expon = expon + 16382
if expon < 0:
fmant = math.ldexp(fmant, expon)
expon = 0
expon = expon | sign
fmant = math.ldexp(fmant, 32)
fsmant = math.floor(fmant)
himant = long(fsmant)
fmant = math.ldexp(fmant - fsmant, 32)
fsmant = math.floor(fmant)
lomant = long(fsmant)
_write_ushort(f, expon)
_write_ulong(f, himant)
_write_ulong(f, lomant)
from chunk import Chunk
class Aifc_read():
def initfp(self, file):
self._version = 0
self._decomp = None
self._convert = None
self._markers = []
self._soundpos = 0
self._file = file
chunk = Chunk(file)
if chunk.getname() != 'FORM':
raise Error, 'file does not start with FORM id'
formdata = chunk.read(4)
if formdata == 'AIFF':
self._aifc = 0
elif formdata == 'AIFC':
self._aifc = 1
else:
raise Error, 'not an AIFF or AIFF-C file'
self._comm_chunk_read = 0
while 1:
self._ssnd_seek_needed = 1
try:
chunk = Chunk(self._file)
except EOFError:
break
chunkname = chunk.getname()
if chunkname == 'COMM':
self._read_comm_chunk(chunk)
self._comm_chunk_read = 1
elif chunkname == 'SSND':
self._ssnd_chunk = chunk
dummy = chunk.read(8)
self._ssnd_seek_needed = 0
elif chunkname == 'FVER':
self._version = _read_ulong(chunk)
elif chunkname == 'MARK':
self._readmark(chunk)
chunk.skip()
if not self._comm_chunk_read or not self._ssnd_chunk:
raise Error, 'COMM chunk and/or SSND chunk missing'
if self._aifc and self._decomp:
import cl
params = [cl.ORIGINAL_FORMAT,
0,
cl.BITS_PER_COMPONENT,
self._sampwidth * 8,
cl.FRAME_RATE,
self._framerate]
if self._nchannels == 1:
params[1] = cl.MONO
elif self._nchannels == 2:
params[1] = cl.STEREO_INTERLEAVED
else:
raise Error, 'cannot compress more than 2 channels'
self._decomp.SetParams(params)
return
def __init__(self, f):
if type(f) == type(''):
f = __builtin__.open(f, 'rb')
self.initfp(f)
def getfp(self):
return self._file
def rewind(self):
self._ssnd_seek_needed = 1
self._soundpos = 0
def close(self):
if self._decomp:
self._decomp.CloseDecompressor()
self._decomp = None
self._file.close()
return
def tell(self):
return self._soundpos
def getnchannels(self):
return self._nchannels
def getnframes(self):
return self._nframes
def getsampwidth(self):
return self._sampwidth
def getframerate(self):
return self._framerate
def getcomptype(self):
return self._comptype
def getcompname(self):
return self._compname
def getparams(self):
return (self.getnchannels(),
self.getsampwidth(),
self.getframerate(),
self.getnframes(),
self.getcomptype(),
self.getcompname())
def getmarkers(self):
if len(self._markers) == 0:
return None
else:
return self._markers
def getmark(self, id):
for marker in self._markers:
if id == marker[0]:
return marker
raise Error, 'marker %r does not exist' % (id,)
def setpos(self, pos):
if pos < 0 or pos > self._nframes:
raise Error, 'position not in range'
self._soundpos = pos
self._ssnd_seek_needed = 1
def readframes(self, nframes):
if self._ssnd_seek_needed:
self._ssnd_chunk.seek(0)
dummy = self._ssnd_chunk.read(8)
pos = self._soundpos * self._framesize
if pos:
self._ssnd_chunk.seek(pos + 8)
self._ssnd_seek_needed = 0
if nframes == 0:
return ''
data = self._ssnd_chunk.read(nframes * self._framesize)
if self._convert and data:
data = self._convert(data)
self._soundpos = self._soundpos + len(data) // (self._nchannels * self._sampwidth)
return data
def _decomp_data(self, data):
import cl
dummy = self._decomp.SetParam(cl.FRAME_BUFFER_SIZE, len(data) * 2)
return self._decomp.Decompress(len(data) // self._nchannels, data)
def _ulaw2lin(self, data):
import audioop
return audioop.ulaw2lin(data, 2)
def _adpcm2lin(self, data):
import audioop
if not hasattr(self, '_adpcmstate'):
self._adpcmstate = None
data, self._adpcmstate = audioop.adpcm2lin(data, 2, self._adpcmstate)
return data
def _read_comm_chunk(self, chunk):
self._nchannels = _read_short(chunk)
self._nframes = _read_long(chunk)
self._sampwidth = (_read_short(chunk) + 7) // 8
self._framerate = int(_read_float(chunk))
self._framesize = self._nchannels * self._sampwidth
if self._aifc:
kludge = 0
if chunk.chunksize == 18:
kludge = 1
print 'Warning: bad COMM chunk size'
chunk.chunksize = 23
self._comptype = chunk.read(4)
if kludge:
length = ord(chunk.file.read(1))
if length & 1 == 0:
length = length + 1
chunk.chunksize = chunk.chunksize + length
chunk.file.seek(-1, 1)
self._compname = _read_string(chunk)
if self._comptype != 'NONE':
if self._comptype == 'G722':
try:
import audioop
except ImportError:
pass
else:
self._convert = self._adpcm2lin
self._sampwidth = 2
return
try:
import cl
except ImportError:
if self._comptype in ('ULAW', 'ulaw'):
try:
import audioop
self._convert = self._ulaw2lin
self._sampwidth = 2
return
except ImportError:
pass
raise Error, 'cannot read compressed AIFF-C files'
if self._comptype in ('ULAW', 'ulaw'):
scheme = cl.G711_ULAW
elif self._comptype in ('ALAW', 'alaw'):
scheme = cl.G711_ALAW
else:
raise Error, 'unsupported compression type'
self._decomp = cl.OpenDecompressor(scheme)
self._convert = self._decomp_data
self._sampwidth = 2
else:
self._comptype = 'NONE'
self._compname = 'not compressed'
def _readmark(self, chunk):
nmarkers = _read_short(chunk)
try:
for i in range(nmarkers):
id = _read_short(chunk)
pos = _read_long(chunk)
name = _read_string(chunk)
if pos or name:
self._markers.append((id, pos, name))
except EOFError:
print 'Warning: MARK chunk contains only',
print len(self._markers),
if len(self._markers) == 1:
print 'marker',
else:
print 'markers',
print 'instead of', nmarkers
class Aifc_write():
def __init__(self, f):
if type(f) == type(''):
filename = f
f = __builtin__.open(f, 'wb')
else:
filename = '???'
self.initfp(f)
if filename[-5:] == '.aiff':
self._aifc = 0
else:
self._aifc = 1
def initfp(self, file):
self._file = file
self._version = _AIFC_version
self._comptype = 'NONE'
self._compname = 'not compressed'
self._comp = None
self._convert = None
self._nchannels = 0
self._sampwidth = 0
self._framerate = 0
self._nframes = 0
self._nframeswritten = 0
self._datawritten = 0
self._datalength = 0
self._markers = []
self._marklength = 0
self._aifc = 1
return
def __del__(self):
if self._file:
self.close()
def aiff(self):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
self._aifc = 0
def aifc(self):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
self._aifc = 1
def setnchannels(self, nchannels):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
if nchannels < 1:
raise Error, 'bad # of channels'
self._nchannels = nchannels
def getnchannels(self):
if not self._nchannels:
raise Error, 'number of channels not set'
return self._nchannels
def setsampwidth(self, sampwidth):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
if sampwidth < 1 or sampwidth > 4:
raise Error, 'bad sample width'
self._sampwidth = sampwidth
def getsampwidth(self):
if not self._sampwidth:
raise Error, 'sample width not set'
return self._sampwidth
def setframerate(self, framerate):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
if framerate <= 0:
raise Error, 'bad frame rate'
self._framerate = framerate
def getframerate(self):
if not self._framerate:
raise Error, 'frame rate not set'
return self._framerate
def setnframes(self, nframes):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
self._nframes = nframes
def getnframes(self):
return self._nframeswritten
def setcomptype(self, comptype, compname):
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
if comptype not in ('NONE', 'ULAW', 'ulaw', 'ALAW', 'alaw', 'G722'):
raise Error, 'unsupported compression type'
self._comptype = comptype
self._compname = compname
def getcomptype(self):
return self._comptype
def getcompname(self):
return self._compname
def setparams(self, info):
nchannels, sampwidth, framerate, nframes, comptype, compname = info
if self._nframeswritten:
raise Error, 'cannot change parameters after starting to write'
if comptype not in ('NONE', 'ULAW', 'ulaw', 'ALAW', 'alaw', 'G722'):
raise Error, 'unsupported compression type'
self.setnchannels(nchannels)
self.setsampwidth(sampwidth)
self.setframerate(framerate)
self.setnframes(nframes)
self.setcomptype(comptype, compname)
def getparams(self):
if not self._nchannels or not self._sampwidth or not self._framerate:
raise Error, 'not all parameters set'
return (self._nchannels,
self._sampwidth,
self._framerate,
self._nframes,
self._comptype,
self._compname)
def setmark(self, id, pos, name):
if id <= 0:
raise Error, 'marker ID must be > 0'
if pos < 0:
raise Error, 'marker position must be >= 0'
if type(name) != type(''):
raise Error, 'marker name must be a string'
for i in range(len(self._markers)):
if id == self._markers[i][0]:
self._markers[i] = (id, pos, name)
return
self._markers.append((id, pos, name))
def getmark(self, id):
for marker in self._markers:
if id == marker[0]:
return marker
raise Error, 'marker %r does not exist' % (id,)
def getmarkers(self):
if len(self._markers) == 0:
return None
else:
return self._markers
def tell(self):
return self._nframeswritten
def writeframesraw(self, data):
self._ensure_header_written(len(data))
nframes = len(data) // (self._sampwidth * self._nchannels)
if self._convert:
data = self._convert(data)
self._file.write(data)
self._nframeswritten = self._nframeswritten + nframes
self._datawritten = self._datawritten + len(data)
def writeframes(self, data):
self.writeframesraw(data)
if self._nframeswritten != self._nframes or self._datalength != self._datawritten:
self._patchheader()
def close(self):
if self._file is None:
return
else:
try:
self._ensure_header_written(0)
if self._datawritten & 1:
self._file.write(chr(0))
self._datawritten = self._datawritten + 1
self._writemarkers()
if self._nframeswritten != self._nframes or self._datalength != self._datawritten or self._marklength:
self._patchheader()
if self._comp:
self._comp.CloseCompressor()
self._comp = None
finally:
self._convert = None
f = self._file
self._file = None
f.close()
return
def _comp_data(self, data):
import cl
dummy = self._comp.SetParam(cl.FRAME_BUFFER_SIZE, len(data))
dummy = self._comp.SetParam(cl.COMPRESSED_BUFFER_SIZE, len(data))
return self._comp.Compress(self._nframes, data)
def _lin2ulaw(self, data):
import audioop
return audioop.lin2ulaw(data, 2)
def _lin2adpcm(self, data):
import audioop
if not hasattr(self, '_adpcmstate'):
self._adpcmstate = None
data, self._adpcmstate = audioop.lin2adpcm(data, 2, self._adpcmstate)
return data
def _ensure_header_written(self, datasize):
if not self._nframeswritten:
if self._comptype in ('ULAW', 'ulaw', 'ALAW', 'alaw'):
if not self._sampwidth:
self._sampwidth = 2
if self._sampwidth != 2:
raise Error, 'sample width must be 2 when compressing with ULAW or ALAW'
if self._comptype == 'G722':
if not self._sampwidth:
self._sampwidth = 2
if self._sampwidth != 2:
raise Error, 'sample width must be 2 when compressing with G7.22 (ADPCM)'
if not self._nchannels:
raise Error, '# channels not specified'
if not self._sampwidth:
raise Error, 'sample width not specified'
if not self._framerate:
raise Error, 'sampling rate not specified'
self._write_header(datasize)
def _init_compression(self):
if self._comptype == 'G722':
self._convert = self._lin2adpcm
return
try:
import cl
except ImportError:
if self._comptype in ('ULAW', 'ulaw'):
try:
import audioop
self._convert = self._lin2ulaw
return
except ImportError:
pass
raise Error, 'cannot write compressed AIFF-C files'
if self._comptype in ('ULAW', 'ulaw'):
scheme = cl.G711_ULAW
elif self._comptype in ('ALAW', 'alaw'):
scheme = cl.G711_ALAW
else:
raise Error, 'unsupported compression type'
self._comp = cl.OpenCompressor(scheme)
params = [cl.ORIGINAL_FORMAT,
0,
cl.BITS_PER_COMPONENT,
self._sampwidth * 8,
cl.FRAME_RATE,
self._framerate,
cl.FRAME_BUFFER_SIZE,
100,
cl.COMPRESSED_BUFFER_SIZE,
100]
if self._nchannels == 1:
params[1] = cl.MONO
elif self._nchannels == 2:
params[1] = cl.STEREO_INTERLEAVED
else:
raise Error, 'cannot compress more than 2 channels'
self._comp.SetParams(params)
dummy = self._comp.Compress(0, '')
self._convert = self._comp_data
def _write_header(self, initlength):
if self._aifc and self._comptype != 'NONE':
self._init_compression()
self._file.write('FORM')
if not self._nframes:
self._nframes = initlength // (self._nchannels * self._sampwidth)
self._datalength = self._nframes * self._nchannels * self._sampwidth
if self._datalength & 1:
self._datalength = self._datalength + 1
if self._aifc:
if self._comptype in ('ULAW', 'ulaw', 'ALAW', 'alaw'):
self._datalength = self._datalength // 2
if self._datalength & 1:
self._datalength = self._datalength + 1
elif self._comptype == 'G722':
self._datalength = (self._datalength + 3) // 4
if self._datalength & 1:
self._datalength = self._datalength + 1
try:
self._form_length_pos = self._file.tell()
except (AttributeError, IOError):
self._form_length_pos = None
commlength = self._write_form_length(self._datalength)
if self._aifc:
self._file.write('AIFC')
self._file.write('FVER')
_write_ulong(self._file, 4)
_write_ulong(self._file, self._version)
else:
self._file.write('AIFF')
self._file.write('COMM')
_write_ulong(self._file, commlength)
_write_short(self._file, self._nchannels)
if self._form_length_pos is not None:
self._nframes_pos = self._file.tell()
_write_ulong(self._file, self._nframes)
if self._comptype in ('ULAW', 'ulaw', 'ALAW', 'alaw', 'G722'):
_write_short(self._file, 8)
else:
_write_short(self._file, self._sampwidth * 8)
_write_float(self._file, self._framerate)
if self._aifc:
self._file.write(self._comptype)
_write_string(self._file, self._compname)
self._file.write('SSND')
if self._form_length_pos is not None:
self._ssnd_length_pos = self._file.tell()
_write_ulong(self._file, self._datalength + 8)
_write_ulong(self._file, 0)
_write_ulong(self._file, 0)
return
def _write_form_length(self, datalength):
if self._aifc:
commlength = 23 + len(self._compname)
if commlength & 1:
commlength = commlength + 1
verslength = 12
else:
commlength = 18
verslength = 0
_write_ulong(self._file, 4 + verslength + self._marklength + 8 + commlength + 16 + datalength)
return commlength
def _patchheader(self):
curpos = self._file.tell()
if self._datawritten & 1:
datalength = self._datawritten + 1
self._file.write(chr(0))
else:
datalength = self._datawritten
if datalength == self._datalength and self._nframes == self._nframeswritten and self._marklength == 0:
self._file.seek(curpos, 0)
return
self._file.seek(self._form_length_pos, 0)
dummy = self._write_form_length(datalength)
self._file.seek(self._nframes_pos, 0)
_write_ulong(self._file, self._nframeswritten)
self._file.seek(self._ssnd_length_pos, 0)
_write_ulong(self._file, datalength + 8)
self._file.seek(curpos, 0)
self._nframes = self._nframeswritten
self._datalength = datalength
def _writemarkers(self):
if len(self._markers) == 0:
return
self._file.write('MARK')
length = 2
for marker in self._markers:
id, pos, name = marker
length = length + len(name) + 1 + 6
if len(name) & 1 == 0:
length = length + 1
_write_ulong(self._file, length)
self._marklength = length + 8
_write_short(self._file, len(self._markers))
for marker in self._markers:
id, pos, name = marker
_write_short(self._file, id)
_write_ulong(self._file, pos)
_write_string(self._file, name)
def open(f, mode = None):
if mode is None:
if hasattr(f, 'mode'):
mode = f.mode
else:
mode = 'rb'
if mode in ('r', 'rb'):
return Aifc_read(f)
elif mode in ('w', 'wb'):
return Aifc_write(f)
else:
raise Error, "mode must be 'r', 'rb', 'w', or 'wb'"
return
openfp = open
if __name__ == '__main__':
import sys
if not sys.argv[1:]:
sys.argv.append('/usr/demos/data/audio/bach.aiff')
fn = sys.argv[1]
f = open(fn, 'r')
try:
print 'Reading', fn
print 'nchannels =', f.getnchannels()
print 'nframes =', f.getnframes()
print 'sampwidth =', f.getsampwidth()
print 'framerate =', f.getframerate()
print 'comptype =', f.getcomptype()
print 'compname =', f.getcompname()
if sys.argv[2:]:
gn = sys.argv[2]
print 'Writing', gn
g = open(gn, 'w')
try:
g.setparams(f.getparams())
while 1:
data = f.readframes(1024)
if not data:
break
g.writeframes(data)
finally:
g.close()
print 'Done.'
finally:
f.close()

View File

@ -1,3 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/antigravity.py
import webbrowser
webbrowser.open('http://xkcd.com/353/')

View File

@ -1,83 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/anydbm.py
"""Generic interface to all dbm clones.
Instead of
import dbm
d = dbm.open(file, 'w', 0666)
use
import anydbm
d = anydbm.open(file, 'w')
The returned object is a dbhash, gdbm, dbm or dumbdbm object,
dependent on the type of database being opened (determined by whichdb
module) in the case of an existing dbm. If the dbm does not exist and
the create or new flag ('c' or 'n') was specified, the dbm type will
be determined by the availability of the modules (tested in the above
order).
It has the following interface (key and data are strings):
d[key] = data # store data at key (may override data at
# existing key)
data = d[key] # retrieve data at key (raise KeyError if no
# such key)
del d[key] # delete data stored at key (raises KeyError
# if no such key)
flag = key in d # true if the key exists
list = d.keys() # return a list of all existing keys (slow!)
Future versions may change the order in which implementations are
tested for existence, and add interfaces to other dbm-like
implementations.
"""
class error(Exception):
pass
_names = ['dbhash',
'gdbm',
'dbm',
'dumbdbm']
_errors = [error]
_defaultmod = None
for _name in _names:
try:
_mod = __import__(_name)
except ImportError:
continue
if not _defaultmod:
_defaultmod = _mod
_errors.append(_mod.error)
if not _defaultmod:
raise ImportError, 'no dbm clone found; tried %s' % _names
error = tuple(_errors)
def open(file, flag = 'r', mode = 438):
"""Open or create database at path given by *file*.
Optional argument *flag* can be 'r' (default) for read-only access, 'w'
for read-write access of an existing database, 'c' for read-write access
to a new or existing database, and 'n' for read-write access to a new
database.
Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
only if it doesn't exist; and 'n' always creates a new database.
"""
from whichdb import whichdb
result = whichdb(file)
if result is None:
if 'c' in flag or 'n' in flag:
mod = _defaultmod
else:
raise error, "need 'c' or 'n' flag to open new db"
elif result == '':
raise error, 'db type could not be determined'
else:
mod = __import__(result)
return mod.open(file, flag, mode)

View File

@ -1,231 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/asynchat.py
r"""A class supporting chat-style (command/response) protocols.
This class adds support for 'chat' style protocols - where one side
sends a 'command', and the other sends a response (examples would be
the common internet protocols - smtp, nntp, ftp, etc..).
The handle_read() method looks at the input stream for the current
'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
for multi-line output), calling self.found_terminator() on its
receipt.
for example:
Say you build an async nntp client using this class. At the start
of the connection, you'll have self.terminator set to '\r\n', in
order to process the single-line greeting. Just before issuing a
'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
import socket
import asyncore
from collections import deque
from sys import py3kwarning
from warnings import filterwarnings, catch_warnings
class async_chat(asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
ac_in_buffer_size = 4096
ac_out_buffer_size = 4096
def __init__(self, sock = None, map = None):
self.ac_in_buffer = ''
self.incoming = []
self.producer_fifo = deque()
asyncore.dispatcher.__init__(self, sock, map)
def collect_incoming_data(self, data):
raise NotImplementedError('must be implemented in subclass')
def _collect_incoming_data(self, data):
self.incoming.append(data)
def _get_data(self):
d = ''.join(self.incoming)
del self.incoming[:]
return d
def found_terminator(self):
raise NotImplementedError('must be implemented in subclass')
def set_terminator(self, term):
"""Set the input delimiter. Can be a fixed string of any length, an integer, or None"""
self.terminator = term
def get_terminator(self):
return self.terminator
def handle_read(self):
try:
data = self.recv(self.ac_in_buffer_size)
except socket.error as why:
self.handle_error()
return
self.ac_in_buffer = self.ac_in_buffer + data
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
terminator = self.get_terminator()
if not terminator:
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = ''
elif isinstance(terminator, int) or isinstance(terminator, long):
n = terminator
if lb < n:
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = ''
self.terminator = self.terminator - lb
else:
self.collect_incoming_data(self.ac_in_buffer[:n])
self.ac_in_buffer = self.ac_in_buffer[n:]
self.terminator = 0
self.found_terminator()
else:
terminator_len = len(terminator)
index = self.ac_in_buffer.find(terminator)
if index != -1:
if index > 0:
self.collect_incoming_data(self.ac_in_buffer[:index])
self.ac_in_buffer = self.ac_in_buffer[index + terminator_len:]
self.found_terminator()
else:
index = find_prefix_at_end(self.ac_in_buffer, terminator)
if index:
if index != lb:
self.collect_incoming_data(self.ac_in_buffer[:-index])
self.ac_in_buffer = self.ac_in_buffer[-index:]
break
else:
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = ''
def handle_write(self):
self.initiate_send()
def handle_close(self):
self.close()
def push(self, data):
sabs = self.ac_out_buffer_size
if len(data) > sabs:
for i in xrange(0, len(data), sabs):
self.producer_fifo.append(data[i:i + sabs])
else:
self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer(self, producer):
self.producer_fifo.append(producer)
self.initiate_send()
def readable(self):
"""predicate for inclusion in the readable for select()"""
return 1
def writable(self):
"""predicate for inclusion in the writable for select()"""
return self.producer_fifo or not self.connected
def close_when_done(self):
"""automatically close this channel once the outgoing queue is empty"""
self.producer_fifo.append(None)
return
def initiate_send(self):
while self.producer_fifo and self.connected:
first = self.producer_fifo[0]
if not first:
del self.producer_fifo[0]
if first is None:
self.handle_close()
return
obs = self.ac_out_buffer_size
try:
with catch_warnings():
if py3kwarning:
filterwarnings('ignore', '.*buffer', DeprecationWarning)
data = buffer(first, 0, obs)
except TypeError:
data = first.more()
if data:
self.producer_fifo.appendleft(data)
else:
del self.producer_fifo[0]
continue
try:
num_sent = self.send(data)
except socket.error:
self.handle_error()
return
if num_sent:
if num_sent < len(data) or obs < len(first):
self.producer_fifo[0] = first[num_sent:]
else:
del self.producer_fifo[0]
return
return
def discard_buffers(self):
self.ac_in_buffer = ''
del self.incoming[:]
self.producer_fifo.clear()
class simple_producer:
def __init__(self, data, buffer_size = 512):
self.data = data
self.buffer_size = buffer_size
def more(self):
if len(self.data) > self.buffer_size:
result = self.data[:self.buffer_size]
self.data = self.data[self.buffer_size:]
return result
else:
result = self.data
self.data = ''
return result
class fifo:
def __init__(self, list = None):
if not list:
self.list = deque()
else:
self.list = deque(list)
def __len__(self):
return len(self.list)
def is_empty(self):
return not self.list
def first(self):
return self.list[0]
def push(self, data):
self.list.append(data)
def pop(self):
if self.list:
return (1, self.list.popleft())
else:
return (0, None)
return None
def find_prefix_at_end(haystack, needle):
l = len(needle) - 1
while l and not haystack.endswith(needle[:l]):
l -= 1
return l

View File

@ -1,560 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/asyncore.py
"""Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
import select
import socket
import sys
import time
import warnings
import os
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, errorcode
_DISCONNECTED = frozenset((ECONNRESET,
ENOTCONN,
ESHUTDOWN,
ECONNABORTED,
EPIPE,
EBADF))
try:
socket_map
except NameError:
socket_map = {}
def _strerror(err):
try:
return os.strerror(err)
except (ValueError, OverflowError, NameError):
if err in errorcode:
return errorcode[err]
return 'Unknown error %s' % err
class ExitNow(Exception):
pass
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
def read(obj):
try:
obj.handle_read_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def write(obj):
try:
obj.handle_write_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def _exception(obj):
try:
obj.handle_expt_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def readwrite(obj, flags):
try:
if flags & select.POLLIN:
obj.handle_read_event()
if flags & select.POLLOUT:
obj.handle_write_event()
if flags & select.POLLPRI:
obj.handle_expt_event()
if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
obj.handle_close()
except socket.error as e:
if e.args[0] not in _DISCONNECTED:
obj.handle_error()
else:
obj.handle_close()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def poll(timeout = 0.0, map = None):
if map is None:
map = socket_map
if map:
r = []
w = []
e = []
for fd, obj in map.items():
is_r = obj.readable()
is_w = obj.writable()
if is_r:
r.append(fd)
if is_w and not obj.accepting:
w.append(fd)
if is_r or is_w:
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
return
try:
r, w, e = select.select(r, w, e, timeout)
except select.error as err:
if err.args[0] != EINTR:
raise
else:
return
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
return
def poll2(timeout = 0.0, map = None):
if map is None:
map = socket_map
if timeout is not None:
timeout = int(timeout * 1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error as err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
return
poll3 = poll2
def loop(timeout = 30.0, use_poll = False, map = None, count = None):
if map is None:
map = socket_map
if use_poll and hasattr(select, 'poll'):
poll_fun = poll2
else:
poll_fun = poll
if count is None:
while map:
poll_fun(timeout, map)
else:
while map and count > 0:
poll_fun(timeout, map)
count = count - 1
return
class dispatcher():
debug = False
connected = False
accepting = False
connecting = False
closing = False
addr = None
ignore_log_types = frozenset(['warning'])
def __init__(self, sock = None, map = None):
if map is None:
self._map = socket_map
else:
self._map = map
self._fileno = None
if sock:
sock.setblocking(0)
self.set_socket(sock, map)
self.connected = True
try:
self.addr = sock.getpeername()
except socket.error as err:
if err.args[0] in (ENOTCONN, EINVAL):
self.connected = False
else:
self.del_channel(map)
raise
else:
self.socket = None
return
def __repr__(self):
status = [self.__class__.__module__ + '.' + self.__class__.__name__]
if self.accepting and self.addr:
status.append('listening')
elif self.connected:
status.append('connected')
if self.addr is not None:
try:
status.append('%s:%d' % self.addr)
except TypeError:
status.append(repr(self.addr))
return '<%s at %#x>' % (' '.join(status), id(self))
__str__ = __repr__
def add_channel(self, map = None):
if map is None:
map = self._map
map[self._fileno] = self
return
def del_channel(self, map = None):
fd = self._fileno
if map is None:
map = self._map
if fd in map:
del map[fd]
self._fileno = None
return
def create_socket(self, family, type):
self.family_and_type = (family, type)
sock = socket.socket(family, type)
sock.setblocking(0)
self.set_socket(sock)
def set_socket(self, sock, map = None):
self.socket = sock
self._fileno = sock.fileno()
self.add_channel(map)
def set_reuse_addr(self):
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1)
except socket.error:
pass
def readable(self):
return True
def writable(self):
return True
def listen(self, num):
self.accepting = True
if os.name == 'nt' and num > 5:
num = 5
return self.socket.listen(num)
def bind(self, addr):
self.addr = addr
return self.socket.bind(addr)
def connect(self, address):
self.connected = False
self.connecting = True
err = self.socket.connect_ex(address)
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) or err == EINVAL and os.name in ('nt', 'ce'):
self.addr = address
return
if err in (0, EISCONN):
self.addr = address
self.handle_connect_event()
else:
raise socket.error(err, errorcode[err])
def accept(self):
try:
conn, addr = self.socket.accept()
except TypeError:
return None
except socket.error as why:
if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
return None
raise
else:
return (conn, addr)
return None
def send(self, data):
try:
result = self.socket.send(data)
return result
except socket.error as why:
if why.args[0] == EWOULDBLOCK:
return 0
if why.args[0] in _DISCONNECTED:
self.handle_close()
return 0
raise
def recv(self, buffer_size):
try:
data = self.socket.recv(buffer_size)
if not data:
self.handle_close()
return ''
return data
except socket.error as why:
if why.args[0] in _DISCONNECTED:
self.handle_close()
return ''
raise
def close(self):
self.connected = False
self.accepting = False
self.connecting = False
self.del_channel()
try:
self.socket.close()
except socket.error as why:
if why.args[0] not in (ENOTCONN, EBADF):
raise
def __getattr__(self, attr):
try:
retattr = getattr(self.socket, attr)
except AttributeError:
raise AttributeError("%s instance has no attribute '%s'" % (self.__class__.__name__, attr))
else:
msg = '%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s instead.' % {'me': self.__class__.__name__,
'attr': attr}
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return retattr
def log(self, message):
sys.stderr.write('log: %s\n' % str(message))
def log_info(self, message, type = 'info'):
if type not in self.ignore_log_types:
print '%s: %s' % (type, message)
def handle_read_event(self):
if self.accepting:
self.handle_accept()
elif not self.connected:
if self.connecting:
self.handle_connect_event()
self.handle_read()
else:
self.handle_read()
def handle_connect_event(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise socket.error(err, _strerror(err))
self.handle_connect()
self.connected = True
self.connecting = False
def handle_write_event(self):
if self.accepting:
return
if not self.connected:
if self.connecting:
self.handle_connect_event()
self.handle_write()
def handle_expt_event(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
self.handle_close()
else:
self.handle_expt()
def handle_error(self):
nil, t, v, tbinfo = compact_traceback()
try:
self_repr = repr(self)
except:
self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
self.log_info('uncaptured python exception, closing channel %s (%s:%s %s)' % (self_repr,
t,
v,
tbinfo), 'error')
self.handle_close()
def handle_expt(self):
self.log_info('unhandled incoming priority event', 'warning')
def handle_read(self):
self.log_info('unhandled read event', 'warning')
def handle_write(self):
self.log_info('unhandled write event', 'warning')
def handle_connect(self):
self.log_info('unhandled connect event', 'warning')
def handle_accept(self):
self.log_info('unhandled accept event', 'warning')
def handle_close(self):
self.log_info('unhandled close event', 'warning')
self.close()
class dispatcher_with_send(dispatcher):
def __init__(self, sock = None, map = None):
dispatcher.__init__(self, sock, map)
self.out_buffer = ''
def initiate_send(self):
num_sent = 0
num_sent = dispatcher.send(self, self.out_buffer[:512])
self.out_buffer = self.out_buffer[num_sent:]
def handle_write(self):
self.initiate_send()
def writable(self):
return not self.connected or len(self.out_buffer)
def send(self, data):
if self.debug:
self.log_info('sending %s' % repr(data))
self.out_buffer = self.out_buffer + data
self.initiate_send()
def compact_traceback():
t, v, tb = sys.exc_info()
tbinfo = []
if not tb:
raise AssertionError('traceback does not exist')
while tb:
tbinfo.append((tb.tb_frame.f_code.co_filename, tb.tb_frame.f_code.co_name, str(tb.tb_lineno)))
tb = tb.tb_next
del tb
file, function, line = tbinfo[-1]
info = ' '.join([ '[%s|%s|%s]' % x for x in tbinfo ])
return ((file, function, line),
t,
v,
info)
def close_all(map = None, ignore_all = False):
if map is None:
map = socket_map
for x in map.values():
try:
x.close()
except OSError as x:
if x.args[0] == EBADF:
pass
elif not ignore_all:
raise
except _reraised_exceptions:
raise
except:
if not ignore_all:
raise
map.clear()
return
if os.name == 'posix':
import fcntl
class file_wrapper():
def __init__(self, fd):
self.fd = os.dup(fd)
def recv(self, *args):
return os.read(self.fd, *args)
def send(self, *args):
return os.write(self.fd, *args)
def getsockopt(self, level, optname, buflen = None):
if level == socket.SOL_SOCKET and optname == socket.SO_ERROR and not buflen:
return 0
raise NotImplementedError('Only asyncore specific behaviour implemented.')
read = recv
write = send
def close(self):
os.close(self.fd)
def fileno(self):
return self.fd
class file_dispatcher(dispatcher):
def __init__(self, fd, map = None):
dispatcher.__init__(self, None, map)
self.connected = True
try:
fd = fd.fileno()
except AttributeError:
pass
self.set_file(fd)
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
return
def set_file(self, fd):
self.socket = file_wrapper(fd)
self._fileno = self.socket.fileno()
self.add_channel()

View File

@ -1,69 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/atexit.py
"""
atexit.py - allow programmer to define multiple exit functions to be executed
upon normal program termination.
One public function, register, is defined.
"""
__all__ = ['register']
import sys
_exithandlers = []
def _run_exitfuncs():
"""run any registered exit functions
_exithandlers is traversed in reverse order so functions are executed
last in, first out.
"""
exc_info = None
while _exithandlers:
func, targs, kargs = _exithandlers.pop()
try:
func(*targs, **kargs)
except SystemExit:
exc_info = sys.exc_info()
except:
import traceback
print >> sys.stderr, 'Error in atexit._run_exitfuncs:'
traceback.print_exc()
exc_info = sys.exc_info()
if exc_info is not None:
raise exc_info[0], exc_info[1], exc_info[2]
return
def register(func, *targs, **kargs):
"""register a function to be executed upon normal program termination
func - function to be called at exit
targs - optional arguments to pass to func
kargs - optional keyword arguments to pass to func
func is returned to facilitate usage as a decorator.
"""
_exithandlers.append((func, targs, kargs))
return func
if hasattr(sys, 'exitfunc'):
register(sys.exitfunc)
sys.exitfunc = _run_exitfuncs
if __name__ == '__main__':
def x1():
print 'running x1'
def x2(n):
print 'running x2(%r)' % (n,)
def x3(n, kwd = None):
print 'running x3(%r, kwd=%r)' % (n, kwd)
register(x1)
register(x2, 12)
register(x3, 5, 'bar')
register(x3, 'no kwd args')

View File

@ -1,245 +0,0 @@
# Embedded file name: /src/external-vcs/github/rocky/uncompyle6/test/ok_2.7/audiodev.py
"""Classes for manipulating audio devices (currently only for Sun and SGI)"""
from warnings import warnpy3k
warnpy3k('the audiodev module has been removed in Python 3.0', stacklevel=2)
del warnpy3k
__all__ = ['error', 'AudioDev']
class error(Exception):
pass
class Play_Audio_sgi:
classinited = 0
frameratelist = nchannelslist = sampwidthlist = None
def initclass(self):
import AL
self.frameratelist = [(48000, AL.RATE_48000),
(44100, AL.RATE_44100),
(32000, AL.RATE_32000),
(22050, AL.RATE_22050),
(16000, AL.RATE_16000),
(11025, AL.RATE_11025),
(8000, AL.RATE_8000)]
self.nchannelslist = [(1, AL.MONO), (2, AL.STEREO), (4, AL.QUADRO)]
self.sampwidthlist = [(1, AL.SAMPLE_8), (2, AL.SAMPLE_16), (3, AL.SAMPLE_24)]
self.classinited = 1
def __init__(self):
import al, AL
if not self.classinited:
self.initclass()
self.oldparams = []
self.params = [AL.OUTPUT_RATE, 0]
self.config = al.newconfig()
self.inited_outrate = 0
self.inited_width = 0
self.inited_nchannels = 0
self.converter = None
self.port = None
return
def __del__(self):
if self.port:
self.stop()
if self.oldparams:
import al, AL
al.setparams(AL.DEFAULT_DEVICE, self.oldparams)
self.oldparams = []
def wait(self):
if not self.port:
return
import time
while self.port.getfilled() > 0:
time.sleep(0.1)
self.stop()
def stop(self):
if self.port:
self.port.closeport()
self.port = None
if self.oldparams:
import al, AL
al.setparams(AL.DEFAULT_DEVICE, self.oldparams)
self.oldparams = []
return
def setoutrate(self, rate):
for raw, cooked in self.frameratelist:
if rate == raw:
self.params[1] = cooked
self.inited_outrate = 1
break
else:
raise error, 'bad output rate'
def setsampwidth(self, width):
for raw, cooked in self.sampwidthlist:
if width == raw:
self.config.setwidth(cooked)
self.inited_width = 1
break
else:
if width == 0:
import AL
self.inited_width = 0
self.config.setwidth(AL.SAMPLE_16)
self.converter = self.ulaw2lin
else:
raise error, 'bad sample width'
def setnchannels(self, nchannels):
for raw, cooked in self.nchannelslist:
if nchannels == raw:
self.config.setchannels(cooked)
self.inited_nchannels = 1
break
else:
raise error, 'bad # of channels'
def writeframes(self, data):
if not (self.inited_outrate and self.inited_nchannels):
raise error, 'params not specified'
if not self.port:
import al, AL
self.port = al.openport('Python', 'w', self.config)
self.oldparams = self.params[:]
al.getparams(AL.DEFAULT_DEVICE, self.oldparams)
al.setparams(AL.DEFAULT_DEVICE, self.params)
if self.converter:
data = self.converter(data)
self.port.writesamps(data)
def getfilled(self):
if self.port:
return self.port.getfilled()
else:
return 0
def getfillable(self):
if self.port:
return self.port.getfillable()
else:
return self.config.getqueuesize()
def ulaw2lin(self, data):
import audioop
return audioop.ulaw2lin(data, 2)
class Play_Audio_sun:
def __init__(self):
self.outrate = 0
self.sampwidth = 0
self.nchannels = 0
self.inited_outrate = 0
self.inited_width = 0
self.inited_nchannels = 0
self.converter = None
self.port = None
return
def __del__(self):
self.stop()
def setoutrate(self, rate):
self.outrate = rate
self.inited_outrate = 1
def setsampwidth(self, width):
self.sampwidth = width
self.inited_width = 1
def setnchannels(self, nchannels):
self.nchannels = nchannels
self.inited_nchannels = 1
def writeframes(self, data):
if not (self.inited_outrate and self.inited_width and self.inited_nchannels):
raise error, 'params not specified'
if not self.port:
import sunaudiodev, SUNAUDIODEV
self.port = sunaudiodev.open('w')
info = self.port.getinfo()
info.o_sample_rate = self.outrate
info.o_channels = self.nchannels
if self.sampwidth == 0:
info.o_precision = 8
self.o_encoding = SUNAUDIODEV.ENCODING_ULAW
else:
info.o_precision = 8 * self.sampwidth
info.o_encoding = SUNAUDIODEV.ENCODING_LINEAR
self.port.setinfo(info)
if self.converter:
data = self.converter(data)
self.port.write(data)
def wait(self):
if not self.port:
return
self.port.drain()
self.stop()
def stop(self):
if self.port:
self.port.flush()
self.port.close()
self.port = None
return
def getfilled(self):
if self.port:
return self.port.obufcount()
else:
return 0
def AudioDev():
try:
import al
except ImportError:
try:
import sunaudiodev
return Play_Audio_sun()
except ImportError:
try:
import Audio_mac
except ImportError:
raise error, 'no audio device'
else:
return Audio_mac.Play_Audio_mac()
else:
return Play_Audio_sgi()
def test(fn = None):
import sys
if sys.argv[1:]:
fn = sys.argv[1]
else:
fn = 'f:just samples:just.aif'
import aifc
af = aifc.open(fn, 'r')
print fn, af.getparams()
p = AudioDev()
p.setoutrate(af.getframerate())
p.setsampwidth(af.getsampwidth())
p.setnchannels(af.getnchannels())
BUFSIZ = af.getframerate() / af.getsampwidth() / af.getnchannels()
while 1:
data = af.readframes(BUFSIZ)
if not data:
break
print len(data)
p.writeframes(data)
p.wait()
if __name__ == '__main__':
test()

View File

@ -54,12 +54,22 @@ PYOC = ('*.pyc', '*.pyo')
test_options = {
# name: (src_basedir, pattern, output_base_suffix, pythoin_version)
'test': ['test', PYC, 'test'],
'2.7': ['python2.7', PYC, 'python2.7', '2.7'],
'ok-2.6': [os.path.join(src_dir, 'ok_2.6'),
PYC, 'ok-2.6', '2.6'],
'test':
['test', PYC, 'test'],
'bytecode-2.5':
['bytecode_2.5', PYC, 'bytecode_2.5', '2.5'],
'2.7':
['python2.7', PYC, 'python2.7', '2.7'],
'ok-2.6':
[os.path.join(src_dir, 'ok_2.6'),
PYC, 'ok-2.6', '2.6'],
'ok-2.7': [os.path.join(src_dir, 'ok_2.7'),
PYC, 'ok-2.7', '2.7'],
'base-2.7': [os.path.join(src_dir, 'base-tests', 'python2.7'),
PYC, 'base_2.7', '2.7'],
}
@ -113,9 +123,9 @@ def do_tests(src_dir, obj_patterns, target_dir, opts):
pass
pass
for root, dirs, basenames in os.walk(src_dir):
for root, dirs, basenames in os.walk('.'):
# Turn root into a relative path
dirname = root[len(src_dir)+1:]
dirname = root[2:] # 2 = len('.') + 1
file_matches(files, dirname, basenames, obj_patterns)
if not files:

View File

@ -45,19 +45,17 @@ from __future__ import print_function
import sys, re
from uncompyle6.spark import GenericASTTraversal
from uncompyle6.dparser import AST
from uncompyle6.scanner import Token, Code
if (sys.version_info >= (3, 0)):
from io import StringIO
import uncompyle6
from .spark import GenericASTTraversal
from .dparser import AST
from .scanner import Token, Code
minint = -sys.maxsize-1
maxint = sys.maxsize
else:
from StringIO import StringIO
from spark import GenericASTTraversal
from dparser import AST
from scanner import Token, Code
minint = -sys.maxint-1
maxint = sys.maxint