Files
DepotDownloaderMod/Scripts/pygob/types.py
2025-01-06 18:47:36 +08:00

682 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""The predefined gob types.
These are the well-known gob types that both senders and receivers
already agree on to bootstrap the protocol.
"""
import struct
import collections
import itertools
# We do not use an Enum for this since this set isn't the full set of
# all type IDs -- the protocol allows a sender to define custom IDs in
# terms of the IDs below.
BOOL = 1
INT = 2
UINT = 3
FLOAT = 4
BYTE_SLICE = 5
STRING = 6
COMPLEX = 7
INTERFACE = 8
# gap for reserved ids.
WIRE_TYPE = 16
ARRAY_TYPE = 17
COMMON_TYPE = 18
SLICE_TYPE = 19
STRUCT_TYPE = 20
FIELD_TYPE = 21
FIELD_TYPE_SLICE = 22
MAP_TYPE = 23
# These don't have a defined typeid
# so these ids are for consistancy
GOB_ENCODER_TYPE = -1
BINARY_MARSHALER_TYPE = -2
TEXT_MARSHALER_TYPE = -3
class classproperty(object):
def __init__(self, fget):
self.fget = fget
def __get__(self, owner_self, owner_cls):
return self.fget(owner_cls)
class GoType:
"""Represents a Go type.
Go types know how to decode a gob stream to their corresponding
Python type.
"""
pass
class GoBool(GoType):
"""An Go Boolean.
Go Booleans are mapped to Python Booleans. This class is meant to
be used statically.
"""
typeid = BOOL
zero = False
@staticmethod
def decode(buf):
"""Decode a Boolean from buf. Returns the Boolean and the remainder of
the buffer:
>>> GoBool.decode(bytes([0]))
(False, b'')
>>> GoBool.decode(bytes([1]))
(True, b'')
"""
n, buf = GoUint.decode(buf)
return n == 1, buf
@staticmethod
def encode(b):
"""Encode a Python Boolean as a Go bool:
>>> list(GoBool.encode(False))
[0]
>>> list(GoBool.encode(True))
[1]
"""
return GoUint.encode(int(b))
class GoUint(GoType):
"""An unsigned Go integer.
Go unsigned integers are mapped to Python integers. This class is
meant to be used statically.
"""
typeid = UINT
zero = 0
@staticmethod
def decode(buf):
"""Decode an unsigned integer from buf. Returns the integer and the
remainder of the buffer:
>>> GoUint.decode(bytes([56]))
(56, b'')
>>> GoUint.decode(bytes([254, 1, 0]))
(256, b'')
"""
if buf[0] < 128: # small uint in a single byte
return buf[0], buf[1:]
# larger uint split over multiple bytes
length = 256 - buf[0]
n = 0
for b in buf[1:length]:
n = (n + b) << 8
n += buf[length]
return n, buf[length + 1:]
@staticmethod
def encode(n):
"""Encode a Python integer as an unsigned Go int:
>>> list(GoUint.encode(56))
[56]
>>> list(GoUint.encode(256))
[254, 1, 0]
"""
if n < 0:
raise ValueError('negative number for GoUint.encode: %s' % n)
if n < 128:
return bytes([n])
else:
encoded = []
while n:
encoded.append(n & 0xFF)
n = n >> 8
encoded.append(256 - len(encoded))
return bytes(reversed(encoded))
class GoInt(GoType):
"""A signed Go integer.
Go signed integers are mapped to Python integers. This class is
meant to be used statically.
"""
typeid = INT
zero = 0
@staticmethod
def decode(buf):
"""Decode a signed integer from buf. Returns the integer and the
remainder of the buffer:
>>> GoInt.decode(bytes([5]))
(-3, b'')
>>> GoInt.decode(bytes([6]))
(3, b'')
"""
uint, buf = GoUint.decode(buf)
if uint & 1:
uint = ~uint
return uint >> 1, buf
@staticmethod
def encode(n):
"""Encode a Python integer as a signed Go int:
>>> list(GoInt.encode(-3))
[5]
>>> list(GoInt.encode(3))
[6]
"""
if n < 0:
uint = (~n << 1) | 1
else:
uint = n << 1
return GoUint.encode(uint)
class GoFloat(GoType):
"""A Go 64-bit float.
Go floats are mapped to Python floats. This class is meant to be
used statically.
"""
typeid = FLOAT
zero = 0.0
@staticmethod
def decode(buf):
"""Decode a 64-bit floating point number from buf. Returns the float
and the remainder of the buffer:
>>> GoFloat.decode(bytes([0]))
(0.0, b'')
>>> GoFloat.decode(bytes([254, 244, 63]))
(1.25, b'')
"""
n, buf = GoUint.decode(buf)
rev = struct.pack('>Q', n)
(f, ) = struct.unpack('<d', rev)
return f, buf
@staticmethod
def encode(f):
"""Encode a Python floating point number as a Go float64:
>>> list(GoFloat.encode(0.0))
[0]
>>> list(GoFloat.encode(1.25))
[254, 244, 63]
Interestingly, Python's representation of NaN (not a number)
is different from Go's. Python uses:
>>> list(struct.pack('d', float('nan')))
[0, 0, 0, 0, 0, 0, 248, 127]
whereas Go uses `[1, 0, 0, 0, 0, 0, 248, 127]`. However, both
are valid ways of representing a IEEE 754 NaN value:
>>> GoFloat.decode(bytes([248, 1, 0, 0, 0, 0, 0, 248, 127]))
(nan, b'')
>>> GoFloat.decode(bytes([254, 248, 127]))
(nan, b'')
They only differ in the so-called "payload" of the value,
which is ignored in most applications.
"""
rev = struct.pack('<d', f)
(n, ) = struct.unpack('>Q', rev)
return GoUint.encode(n)
class GoByteSlice(GoType):
"""A Go byte slice.
Go byte slices are mapped to Python bytearrays.
This class is meant to be used statically.
"""
typeid = BYTE_SLICE
@classproperty
def zero(cls):
return bytearray()
@staticmethod
def decode(buf):
"""Decode a byte slice from buf. Returns the slice and the remainder
of the buffer:
>>> GoByteSlice.decode(bytes([5, 104, 101, 108, 108, 111]))
(bytearray(b'hello'), b'')
"""
count, buf = GoUint.decode(buf)
return bytearray(buf[:count]), buf[count:]
@staticmethod
def encode(buf):
"""Encode a Python bytes value as a Go byte slice:
>>> list(GoByteSlice.encode(b'hello'))
[5, 104, 101, 108, 108, 111]
"""
return GoUint.encode(len(buf)) + buf
class GoString(GoType):
"""A Go string.
Go strings are mapped to Python bytes since Go strings do not
guarantee any particular encoding. This class is meant to be used
statically.
"""
typeid = STRING
zero = b''
@staticmethod
def decode(buf):
"""Decode a string from buf. Since Go strings do not guarantee any
particular encoding, the data is returned as bytes:
>>> GoString.decode(bytes([5, 104, 101, 108, 108, 111]))
(b'hello', b'')
"""
count, buf = GoUint.decode(buf)
# TODO: Go strings do not guarantee any particular encoding.
# Add support for trying to decode the bytes using, say,
# UTF-8, so we can return a real Python string.
return buf[:count], buf[count:]
@staticmethod
def encode(s):
"""Encode a Python string as a Go string. The string will be UTF-8
encoded before being turned into bytes since most Go programs
will expect that encoding:
>>> GoString.encode('alpha: α')
b'\\talpha: \\xce\\xb1'
"""
return GoByteSlice.encode(s.encode('utf-8'))
class GoComplex(GoType):
"""A Go complex number.
Go complex numbers are mapped to Python complex numbers. This
class is meant to be used statically.
"""
typeid = COMPLEX
zero = 0 + 0j
@staticmethod
def decode(buf):
"""Decode a complex number from `buf`. Returns the number and the
remainder of the buffer:
>>> GoComplex.decode(bytes([0, 254, 244, 63]))
(1.25j, b'')
"""
re, buf = GoFloat.decode(buf)
im, buf = GoFloat.decode(buf)
return complex(re, im), buf
@staticmethod
def encode(z):
"""Encode a complex number:
>>> list(GoComplex.encode(1.25j))
[0, 254, 244, 63]
"""
return GoFloat.encode(z.real) + GoFloat.encode(z.imag)
class GoStruct(GoType):
"""A Go struct.
Go structs are mapped to Python named tuples.
"""
@property
def zero(self):
values = []
for _, t in self._fields:
type_ = self._loader.types[t]
if type_ == self:
# avoid infinite recursion with recursive types
values.append(None)
else:
values.append(type_.zero)
return self._class._make(values)
def __init__(self, typeid, name, loader, fields):
"""A Go struct with a certain set of fields.
The zero value of a GoStruct is based on the zero values of
each field:
>>> from pygob import Loader
>>> person = GoStruct(142, 'Person', Loader(), [
... ('Name', STRING),
... ('Age', INT),
... ])
>>> person.zero
Person(Name=b'', Age=0)
"""
self.typeid = typeid
self._name = name
if name == '':
name = type(self).__name__
# also needed for recursive types
if hasattr(loader, "types") and typeid not in loader.types:
loader.types[typeid] = self
self._loader = loader
self._fields = fields
if name.__contains__(' '):
name = type(self).__name__
self._class = collections.namedtuple(name, [n for (n, t) in fields], rename=True)
def decode(self, buf):
"""Decode data from buf and return a namedtuple."""
values = {}
field_id = -1
while True:
delta, buf = GoUint.decode(buf)
if delta == 0:
break
field_id += delta
name, typeid = self._fields[field_id]
value, buf = self._loader.types[typeid].decode(buf)
values[name] = value
return self.zero._replace(**values), buf
def encode(self, values):
buf = bytearray()
delta = 1
for field, value in itertools.zip_longest(self._fields, values, fillvalue=None):
if value == None:
delta += 1
continue
buf.extend(GoUint.encode(delta))
field_type = self._loader.types[field[1]]
buf.extend(field_type.encode(value))
delta = 1
buf.extend(GoUint.encode(0))
return buf
def __repr__(self):
"""GoStruct representation.
>>> GoStruct(142, 'Person', None, [('Name', STRING), ('Age', INT)])
<GoStruct Person Name=6, Age=2>
"""
fields = ['%s=%s' % f for f in self._fields]
return '<GoStruct %s %s>' % (self._name, ', '.join(fields))
class GoWireType(GoStruct):
"""A Go wire type.
This type is used in the gob stream to describe custom types.
Decoding a WIRE_TYPE value yields another GoType subclass which
can be used later to decode actual values of the custom type.
"""
def decode(self, buf):
"""Decode data from buf and return a GoType."""
wire_type, buf = super().decode(buf)
if wire_type.ArrayT != self._loader.types[ARRAY_TYPE].zero:
typeid = wire_type.ArrayT.CommonType.Id
elem = wire_type.ArrayT.Elem
length = wire_type.ArrayT.Len
return GoArray(typeid, self._loader, elem, length), buf
if wire_type.SliceT != self._loader.types[SLICE_TYPE].zero:
typeid = wire_type.SliceT.CommonType.Id
elem = wire_type.SliceT.Elem
return GoSlice(typeid, self._loader, elem), buf
if wire_type.StructT != self._loader.types[STRUCT_TYPE].zero:
typeid = wire_type.StructT.CommonType.Id
# Named tuples must be constructed using strings, not
# bytes, so we need to decode the names here. Go source
# files are defined to be UTF-8 encoded.
name = wire_type.StructT.CommonType.Name.decode('utf-8')
fields = [(f.Name.decode('utf-8'), f.Id)
for f in wire_type.StructT.Field]
return GoStruct(typeid, name, self._loader, fields), buf
if wire_type.MapT != self._loader.types[MAP_TYPE].zero:
typeid = wire_type.MapT.CommonType.Id
key_typeid = wire_type.MapT.Key
elem_typeid = wire_type.MapT.Elem
return GoMap(typeid, self._loader, key_typeid, elem_typeid), buf
if wire_type.GobEncoderT != self._loader.types[GOB_ENCODER_TYPE].zero:
typeid = wire_type.GobEncoderT.CommonType.Id
name = wire_type.GobEncoderT.CommonType.Name.decode('utf-8')
return GoGobEncoder(typeid, self._loader), buf
if wire_type.BinaryMarshalerT != self._loader.types[BINARY_MARSHALER_TYPE].zero:
typeid = wire_type.BinaryMarshalerT.GobEncoderT.CommonType.Id
name = wire_type.BinaryMarshalerT.GobEncoderT.CommonType.Name.decode('utf-8')
return GoBinaryMarshaler(typeid, self._loader), buf
if wire_type.TextMarshalerT != self._loader.types[TEXT_MARSHALER_TYPE].zero:
typeid = wire_type.TextMarshalerT.GobEncoderT.CommonType.Id
name = wire_type.TextMarshalerT.GobEncoderT.CommonType.Name.decode('utf-8')
return GoTextMarshaler(typeid, self._loader), buf
raise NotImplementedError("cannot handle %s" % wire_type)
class GoArray(GoType):
"""A Go array.
Go arrays are mapped to Python tuples.
"""
@property
def zero(self):
return (self._loader.types[self._elem].zero, ) * self._length
def __init__(self, typeid, loader, elem, length):
"""A Go array of a certain type and length.
>>> from pygob import Loader
>>> int3 = GoArray(142, Loader(), INT, 3)
>>> int3.zero
(0, 0, 0)
"""
self.typeid = typeid
self._loader = loader
self._elem = elem
self._length = length
def decode(self, buf):
"""Decode data from buf and return a tuple.
Go arrays have a fixed size and cannot be resized. This makes
them more like Python tuples than Python lists.
"""
count, buf = GoUint.decode(buf)
assert count == self._length, \
"expected %d elements, found %d" % (self._length, count)
result = []
for i in range(count):
value, buf = self._loader.decode_value(self._elem, buf)
result.append(value)
return tuple(result), buf
def encode(self, values):
buf = bytearray()
buf.extend(GoUint.encode(len(value)))
for value in values:
go_type = self._loader.python_types.get(type(value))
buf.extend(go_type.encode(value))
return buf
class GoSlice(GoType):
"""A Go slice.
Go slices are mapped to Python lists.
"""
@property
def zero(cls):
return []
def __init__(self, typeid, loader, elem):
"""A Go slice of a certain type.
>>> from pygob import Loader
>>> int_slice = GoSlice(142, Loader(), INT)
>>> int_slice.zero
[]
"""
self.typeid = typeid
self._loader = loader
self._elem = elem
def decode(self, buf):
"""Decode data from buf and return a list.
Go slices can extended later (with a possible reallocation of
the underlying array) and are thus similar to Python lists.
"""
count, buf = GoUint.decode(buf)
result = []
for i in range(count):
value, buf = self._loader.decode_value(self._elem, buf)
result.append(value)
return result, buf
def encode(self, value):
buf = bytearray()
buf.extend(GoUint.encode(len(value)))
for i in value:
go_type = self._loader.python_types.get(type(value))
buf.extend(go_type.encode(value))
return buf
class GoMap(GoType):
"""A Go map.
Go maps are mapped to Python dictionaries.
"""
@property
def zero(cls):
return {}
def __init__(self, typeid, loader, key_typeid, elem_typeid):
"""A Go map with a certain key and element type.
>>> from pygob import Loader
>>> int_string_map = GoMap(142, Loader(), INT, STRING)
>>> int_string_map.zero
{}
"""
self.typeid = typeid
self._loader = loader
self._key_typeid = key_typeid
self._elem_typeid = elem_typeid
def decode(self, buf):
"""Decode data from buf and return a dict."""
count, buf = GoUint.decode(buf)
result = {}
for i in range(count):
key, buf = self._loader.decode_value(self._key_typeid, buf)
value, buf = self._loader.decode_value(self._elem_typeid, buf)
result[key] = value
return result, buf
def encode(self, value):
buf = bytearray()
buf.extend(GoUint.encode(len(value)))
for i in value:
key_type = self._loader.python_types.get(type(i[0]))
value_type = self._loader.python_types.get(type(i[1]))
buf.extend(key_type.encode(i[0]))
buf.extend(value_type.encode([1]))
return buf
class GoGobEncoder(GoType):
"""A Go Gob Encoder.
"""
@property
def zero(self):
return b''
def __init__(self, typeid, loader):
self.typeid = typeid
self._loader = loader
def decode(self, buf):
count, buf = GoUint.decode(buf)
return buf[:count], buf[count:]
def encode(self, value):
buf = bytearray()
buf.extend(GoUint.encode(len(value)))
buf.extend(value)
return buf
class GoBinaryMarshaler(GoType):
"""A Go Binary Marshaler
"""
@property
def zero(self):
return b''
def __init__(self, typeid, loader):
self.typeid = typeid
self._loader = loader
def decode(self, buf):
count, buf = GoUint.decode(buf)
return buf[:count], buf[count:]
def encode(self, value):
buf = bytearray()
buf.extend(GoUint.encode(len(value)))
buf.extend(value)
return buf
class GoTextMarshaler(GoType):
"""A Go Text Marshaler
"""
@property
def zero(self):
return b''
def __init__(self, typeid, loader):
self.typeid = typeid
self._loader = loader
def decode(self, buf):
count, buf = GoUint.decode(buf)
return buf[:count], buf[count:]
def encode(self, value):
buf = bytearray()
buf.extend(GoUint.encode(len(value)))
buf.extend(value)
return buf