bug 1421816 - (1/2) add cbor2 library r=ted

This library will be used to create test COSE signatures for the new COSE add-on
signature verification implementation.

David Keeler 2017-11-29 10:57:37 -08:00
David Keeler 2017-11-29 10:57:37 -08:00
25 changed files with 2065 additions and 0 deletions

@ -54,6 +54,7 @@ mozilla.pth:third_party/python/mock-1.0.0

@ -0,0 +1,13 @@

@ -0,0 +1,28 @@
@ -0,0 +1,19 @@
This is the MIT license: http://www.opensource.org/licenses/mit-license.php
Copyright (c) Alex Grönholm
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.

@ -0,0 +1,45 @@
Metadata-Version: 1.1
Name: cbor2
Version: 4.0.1
Summary: Pure Python CBOR (de)serializer with extensive tag support
Home-page: https://github.com/agronholm/cbor2
Author: Alex Grönholm
Author-email: alex.gronholm@nextday.fi
License: MIT
Description: .. image:: https://travis-ci.org/agronholm/cbor2.svg?branch=master
:target: https://travis-ci.org/agronholm/cbor2
:alt: Build Status
.. image:: https://coveralls.io/repos/github/agronholm/cbor2/badge.svg?branch=master
:target: https://coveralls.io/github/agronholm/cbor2?branch=master
:alt: Code Coverage
This library provides encoding and decoding for the Concise Binary Object Representation (CBOR)
(`RFC 7049`_) serialization format.
There exists another Python CBOR implementation (cbor) which is faster on CPython due to its C
extensions. On PyPy, cbor2 and cbor are almost identical in performance. The other implementation
also lacks documentation and a comprehensive test suite, does not support most standard extension
tags and is known to crash (segfault) when passed a cyclic structure (say, a list containing
.. _RFC 7049: https://tools.ietf.org/html/rfc7049
Project links
* `Documentation <http://cbor2.readthedocs.org/>`_
* `Source code <https://github.com/agronholm/cbor2>`_
* `Issue tracker <https://github.com/agronholm/cbor2/issues>`_
Keywords: serialization cbor
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6

@ -0,0 +1,24 @@
.. image:: https://travis-ci.org/agronholm/cbor2.svg?branch=master
:target: https://travis-ci.org/agronholm/cbor2
:alt: Build Status
.. image:: https://coveralls.io/repos/github/agronholm/cbor2/badge.svg?branch=master
:target: https://coveralls.io/github/agronholm/cbor2?branch=master
:alt: Code Coverage
This library provides encoding and decoding for the Concise Binary Object Representation (CBOR)
(`RFC 7049`_) serialization format.
There exists another Python CBOR implementation (cbor) which is faster on CPython due to its C
extensions. On PyPy, cbor2 and cbor are almost identical in performance. The other implementation
also lacks documentation and a comprehensive test suite, does not support most standard extension
tags and is known to crash (segfault) when passed a cyclic structure (say, a list containing
.. _RFC 7049: https://tools.ietf.org/html/rfc7049
Project links
* `Documentation <http://cbor2.readthedocs.org/>`_
* `Source code <https://github.com/agronholm/cbor2>`_
* `Issue tracker <https://github.com/agronholm/cbor2/issues>`_

@ -0,0 +1,3 @@
from cbor2.decoder import load, loads, CBORDecoder, CBORDecodeError # noqa
from cbor2.encoder import dump, dumps, CBOREncoder, CBOREncodeError, shareable_encoder # noqa
from cbor2.types import CBORTag, CBORSimpleValue, undefined # noqa

@ -0,0 +1,49 @@
import sys
if sys.version_info.major < 3:
from datetime import tzinfo, timedelta
class timezone(tzinfo):
def __init__(self, offset):
self.offset = offset
def utcoffset(self, dt):
return self.offset
def dst(self, dt):
return timedelta(0)
def tzname(self, dt):
return 'UTC+00:00'
def as_unicode(string):
return string.decode('utf-8')
def iteritems(self):
return self.iteritems()
def bytes_from_list(values):
return bytes(bytearray(values))
byte_as_integer = ord
timezone.utc = timezone(timedelta(0))
xrange = xrange # noqa
long = long # noqa
unicode = unicode # noqa
from datetime import timezone
def byte_as_integer(bytestr):
return bytestr[0]
def as_unicode(string):
return string
def iteritems(self):
return self.items()
xrange = range # noqa
long = int # noqa
unicode = str # noqa
bytes_from_list = bytes

@ -0,0 +1,411 @@
import re
import struct
from datetime import datetime, timedelta
from io import BytesIO
from cbor2.compat import timezone, xrange, byte_as_integer
from cbor2.types import CBORTag, undefined, break_marker, CBORSimpleValue
timestamp_re = re.compile(r'^(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)'
class CBORDecodeError(Exception):
"""Raised when an error occurs deserializing a CBOR datastream."""
def decode_uint(decoder, subtype, shareable_index=None, allow_infinite=False):
# Major tag 0
if subtype < 24:
return subtype
elif subtype == 24:
return struct.unpack('>B', decoder.read(1))[0]
elif subtype == 25:
return struct.unpack('>H', decoder.read(2))[0]
elif subtype == 26:
return struct.unpack('>L', decoder.read(4))[0]
elif subtype == 27:
return struct.unpack('>Q', decoder.read(8))[0]
elif subtype == 31 and allow_infinite:
return None
raise CBORDecodeError('unknown unsigned integer subtype 0x%x' % subtype)
def decode_negint(decoder, subtype, shareable_index=None):
# Major tag 1
uint = decode_uint(decoder, subtype)
return -uint - 1
def decode_bytestring(decoder, subtype, shareable_index=None):
# Major tag 2
length = decode_uint(decoder, subtype, allow_infinite=True)
if length is None:
# Indefinite length
buf = bytearray()
while True:
initial_byte = byte_as_integer(decoder.read(1))
if initial_byte == 255:
return buf
length = decode_uint(decoder, initial_byte & 31)
value = decoder.read(length)
return decoder.read(length)
def decode_string(decoder, subtype, shareable_index=None):
# Major tag 3
return decode_bytestring(decoder, subtype).decode('utf-8')
def decode_array(decoder, subtype, shareable_index=None):
# Major tag 4
items = []
decoder.set_shareable(shareable_index, items)
length = decode_uint(decoder, subtype, allow_infinite=True)
if length is None:
# Indefinite length
while True:
value = decoder.decode()
if value is break_marker:
for _ in xrange(length):
item = decoder.decode()
return items
def decode_map(decoder, subtype, shareable_index=None):
# Major tag 5
dictionary = {}
decoder.set_shareable(shareable_index, dictionary)
length = decode_uint(decoder, subtype, allow_infinite=True)
if length is None:
# Indefinite length
while True:
key = decoder.decode()
if key is break_marker:
value = decoder.decode()
dictionary[key] = value
for _ in xrange(length):
key = decoder.decode()
value = decoder.decode()
dictionary[key] = value
if decoder.object_hook:
return decoder.object_hook(decoder, dictionary)
return dictionary
def decode_semantic(decoder, subtype, shareable_index=None):
# Major tag 6
tagnum = decode_uint(decoder, subtype)
# Special handling for the "shareable" tag
if tagnum == 28:
shareable_index = decoder._allocate_shareable()
return decoder.decode(shareable_index)
value = decoder.decode()
semantic_decoder = semantic_decoders.get(tagnum)
if semantic_decoder:
return semantic_decoder(decoder, value, shareable_index)
tag = CBORTag(tagnum, value)
if decoder.tag_hook:
return decoder.tag_hook(decoder, tag, shareable_index)
return tag
def decode_special(decoder, subtype, shareable_index=None):
# Simple value
if subtype < 20:
return CBORSimpleValue(subtype)
# Major tag 7
return special_decoders[subtype](decoder)
# Semantic decoders (major tag 6)
def decode_datetime_string(decoder, value, shareable_index=None):
# Semantic tag 0
match = timestamp_re.match(value)
if match:
year, month, day, hour, minute, second, micro, offset_h, offset_m = match.groups()
if offset_h:
tz = timezone(timedelta(hours=int(offset_h), minutes=int(offset_m)))
tz = timezone.utc
return datetime(int(year), int(month), int(day), int(hour), int(minute), int(second),
int(micro or 0), tz)
raise CBORDecodeError('invalid datetime string: {}'.format(value))
def decode_epoch_datetime(decoder, value, shareable_index=None):
# Semantic tag 1
return datetime.fromtimestamp(value, timezone.utc)
def decode_positive_bignum(decoder, value, shareable_index=None):
# Semantic tag 2
from binascii import hexlify
return int(hexlify(value), 16)
def decode_negative_bignum(decoder, value, shareable_index=None):
# Semantic tag 3
return -decode_positive_bignum(decoder, value) - 1
def decode_fraction(decoder, value, shareable_index=None):
# Semantic tag 4
from decimal import Decimal
exp = Decimal(value[0])
mantissa = Decimal(value[1])
return mantissa * (10 ** exp)
def decode_bigfloat(decoder, value, shareable_index=None):
# Semantic tag 5
from decimal import Decimal
exp = Decimal(value[0])
mantissa = Decimal(value[1])
return mantissa * (2 ** exp)
def decode_sharedref(decoder, value, shareable_index=None):
# Semantic tag 29
shared = decoder._shareables[value]
except IndexError:
raise CBORDecodeError('shared reference %d not found' % value)
if shared is None:
raise CBORDecodeError('shared value %d has not been initialized' % value)
return shared
def decode_rational(decoder, value, shareable_index=None):
# Semantic tag 30
from fractions import Fraction
return Fraction(*value)
def decode_regexp(decoder, value, shareable_index=None):
# Semantic tag 35
return re.compile(value)
def decode_mime(decoder, value, shareable_index=None):
# Semantic tag 36
from email.parser import Parser
return Parser().parsestr(value)
def decode_uuid(decoder, value, shareable_index=None):
# Semantic tag 37
from uuid import UUID
return UUID(bytes=value)
# Special decoders (major tag 7)
def decode_simple_value(decoder, shareable_index=None):
return CBORSimpleValue(struct.unpack('>B', decoder.read(1))[0])
def decode_float16(decoder, shareable_index=None):
# Code adapted from RFC 7049, appendix D
from math import ldexp
def decode_single(single):
return struct.unpack("!f", struct.pack("!I", single))[0]
payload = struct.unpack('>H', decoder.read(2))[0]
value = (payload & 0x7fff) << 13 | (payload & 0x8000) << 16
if payload & 0x7c00 != 0x7c00:
return ldexp(decode_single(value), 112)
return decode_single(value | 0x7f800000)
def decode_float32(decoder, shareable_index=None):
return struct.unpack('>f', decoder.read(4))[0]
def decode_float64(decoder, shareable_index=None):
return struct.unpack('>d', decoder.read(8))[0]
major_decoders = {
0: decode_uint,
1: decode_negint,
2: decode_bytestring,
3: decode_string,
4: decode_array,
5: decode_map,
6: decode_semantic,
7: decode_special
special_decoders = {
20: lambda self: False,
21: lambda self: True,
22: lambda self: None,
23: lambda self: undefined,
24: decode_simple_value,
25: decode_float16,
26: decode_float32,
27: decode_float64,
31: lambda self: break_marker
semantic_decoders = {
0: decode_datetime_string,
1: decode_epoch_datetime,
2: decode_positive_bignum,
3: decode_negative_bignum,
4: decode_fraction,
5: decode_bigfloat,
29: decode_sharedref,
30: decode_rational,
35: decode_regexp,
36: decode_mime,
37: decode_uuid
class CBORDecoder(object):
Deserializes a CBOR encoded byte stream.
:param tag_hook: Callable that takes 3 arguments: the decoder instance, the
:class:`~cbor2.types.CBORTag` and the shareable index for the resulting object, if any.
This callback is called for any tags for which there is no built-in decoder.
The return value is substituted for the CBORTag object in the deserialized output.
:param object_hook: Callable that takes 2 arguments: the decoder instance and the dictionary.
This callback is called for each deserialized :class:`dict` object.
The return value is substituted for the dict in the deserialized output.
__slots__ = ('fp', 'tag_hook', 'object_hook', '_shareables')
def __init__(self, fp, tag_hook=None, object_hook=None):
self.fp = fp
self.tag_hook = tag_hook
self.object_hook = object_hook
self._shareables = []
def _allocate_shareable(self):
return len(self._shareables) - 1
def set_shareable(self, index, value):
Set the shareable value for the last encountered shared value marker, if any.
If the given index is ``None``, nothing is done.
:param index: the value of the ``shared_index`` argument to the decoder
:param value: the shared value
if index is not None:
self._shareables[index] = value
def read(self, amount):
Read bytes from the data stream.
:param int amount: the number of bytes to read
data = self.fp.read(amount)
if len(data) < amount:
raise CBORDecodeError('premature end of stream (expected to read {} bytes, got {} '
'instead)'.format(amount, len(data)))
return data
def decode(self, shareable_index=None):
Decode the next value from the stream.
:raises CBORDecodeError: if there is any problem decoding the stream
initial_byte = byte_as_integer(self.fp.read(1))
major_type = initial_byte >> 5
subtype = initial_byte & 31
except Exception as e:
raise CBORDecodeError('error reading major type at index {}: {}'
.format(self.fp.tell(), e))
decoder = major_decoders[major_type]
return decoder(self, subtype, shareable_index)
except CBORDecodeError:
except Exception as e:
raise CBORDecodeError('error decoding value at index {}: {}'.format(self.fp.tell(), e))
def decode_from_bytes(self, buf):
Wrap the given bytestring as a file and call :meth:`decode` with it as the argument.
This method was intended to be used from the ``tag_hook`` hook when an object needs to be
decoded separately from the rest but while still taking advantage of the shared value
old_fp = self.fp
self.fp = BytesIO(buf)
retval = self.decode()
self.fp = old_fp
return retval
def loads(payload, **kwargs):
Deserialize an object from a bytestring.
:param bytes payload: the bytestring to serialize
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
:return: the deserialized object
fp = BytesIO(payload)
return CBORDecoder(fp, **kwargs).decode()
def load(fp, **kwargs):
Deserialize an object from an open file.
:param fp: the input file (any file-like object)
:param kwargs: keyword arguments passed to :class:`~.CBORDecoder`
:return: the deserialized object
return CBORDecoder(fp, **kwargs).decode()

@ -0,0 +1,362 @@
import re
import struct
from collections import OrderedDict, defaultdict
from contextlib import contextmanager
from functools import wraps
from datetime import datetime, date, time
from io import BytesIO
from cbor2.compat import iteritems, timezone, long, unicode, as_unicode, bytes_from_list
from cbor2.types import CBORTag, undefined, CBORSimpleValue
class CBOREncodeError(Exception):
"""Raised when an error occurs while serializing an object into a CBOR datastream."""
def shareable_encoder(func):
Wrap the given encoder function to gracefully handle cyclic data structures.
If value sharing is enabled, this marks the given value shared in the datastream on the
first call. If the value has already been passed to this method, a reference marker is
instead written to the data stream and the wrapped function is not called.
If value sharing is disabled, only infinite recursion protection is done.
def wrapper(encoder, value, *args, **kwargs):
value_id = id(value)
container, container_index = encoder._shared_containers.get(value_id, (None, None))
if encoder.value_sharing:
if container is value:
# Generate a reference to the previous index instead of encoding this again
encoder.write(encode_length(0xd8, 0x1d))
encode_int(encoder, container_index)
# Mark the container as shareable
encoder._shared_containers[value_id] = (value, len(encoder._shared_containers))
encoder.write(encode_length(0xd8, 0x1c))
func(encoder, value, *args, **kwargs)
if container is value:
raise CBOREncodeError('cyclic data structure detected but value sharing is '
encoder._shared_containers[value_id] = (value, None)
func(encoder, value, *args, **kwargs)
del encoder._shared_containers[value_id]
return wrapper
def encode_length(major_tag, length):
if length < 24:
return struct.pack('>B', major_tag | length)
elif length < 256:
return struct.pack('>BB', major_tag | 24, length)
elif length < 65536:
return struct.pack('>BH', major_tag | 25, length)
elif length < 4294967296:
return struct.pack('>BL', major_tag | 26, length)
return struct.pack('>BQ', major_tag | 27, length)
def encode_int(encoder, value):
# Big integers (2 ** 64 and over)
if value >= 18446744073709551616 or value < -18446744073709551616:
if value >= 0:
major_type = 0x02
major_type = 0x03
value = -value - 1
values = []
while value > 0:
value, remainder = divmod(value, 256)
values.insert(0, remainder)
payload = bytes_from_list(values)
encode_semantic(encoder, CBORTag(major_type, payload))
elif value >= 0:
encoder.write(encode_length(0, value))
encoder.write(encode_length(0x20, abs(value) - 1))
def encode_bytestring(encoder, value):
encoder.write(encode_length(0x40, len(value)) + value)
def encode_bytearray(encoder, value):
encode_bytestring(encoder, bytes(value))
def encode_string(encoder, value):
encoded = value.encode('utf-8')
encoder.write(encode_length(0x60, len(encoded)) + encoded)
def encode_array(encoder, value):
encoder.write(encode_length(0x80, len(value)))
for item in value:
def encode_map(encoder, value):
encoder.write(encode_length(0xa0, len(value)))
for key, val in iteritems(value):
def encode_semantic(encoder, value):
encoder.write(encode_length(0xc0, value.tag))
# Semantic decoders (major tag 6)
def encode_datetime(encoder, value):
# Semantic tag 0
if not value.tzinfo:
if encoder.timezone:
value = value.replace(tzinfo=encoder.timezone)
raise CBOREncodeError(
'naive datetime encountered and no default timezone has been set')
if encoder.datetime_as_timestamp:
from calendar import timegm
timestamp = timegm(value.utctimetuple()) + value.microsecond // 1000000
encode_semantic(encoder, CBORTag(1, timestamp))
datestring = as_unicode(value.isoformat().replace('+00:00', 'Z'))
encode_semantic(encoder, CBORTag(0, datestring))
def encode_date(encoder, value):
value = datetime.combine(value, time()).replace(tzinfo=timezone.utc)
encode_datetime(encoder, value)
def encode_decimal(encoder, value):
# Semantic tag 4
if value.is_nan():
elif value.is_infinite():
encoder.write(b'\xf9\x7c\x00' if value > 0 else b'\xf9\xfc\x00')
dt = value.as_tuple()
mantissa = sum(d * 10 ** i for i, d in enumerate(reversed(dt.digits)))
with encoder.disable_value_sharing():
encode_semantic(encoder, CBORTag(4, [dt.exponent, mantissa]))
def encode_rational(encoder, value):
# Semantic tag 30
with encoder.disable_value_sharing():
encode_semantic(encoder, CBORTag(30, [value.numerator, value.denominator]))
def encode_regexp(encoder, value):
# Semantic tag 35
encode_semantic(encoder, CBORTag(35, as_unicode(value.pattern)))
def encode_mime(encoder, value):
# Semantic tag 36
encode_semantic(encoder, CBORTag(36, as_unicode(value.as_string())))
def encode_uuid(encoder, value):
# Semantic tag 37
encode_semantic(encoder, CBORTag(37, value.bytes))
# Special encoders (major tag 7)
def encode_simple_value(encoder, value):
if value.value < 20:
encoder.write(struct.pack('>B', 0xe0 | value.value))
encoder.write(struct.pack('>BB', 0xf8, value.value))
def encode_float(encoder, value):
# Handle special values efficiently
import math
if math.isnan(value):
elif math.isinf(value):
encoder.write(b'\xf9\x7c\x00' if value > 0 else b'\xf9\xfc\x00')
encoder.write(struct.pack('>Bd', 0xfb, value))
def encode_boolean(encoder, value):
encoder.write(b'\xf5' if value else b'\xf4')
def encode_none(encoder, value):
def encode_undefined(encoder, value):
default_encoders = OrderedDict([
(bytes, encode_bytestring),
(bytearray, encode_bytearray),
(unicode, encode_string),
(int, encode_int),
(long, encode_int),
(float, encode_float),
(('decimal', 'Decimal'), encode_decimal),
(bool, encode_boolean),
(type(None), encode_none),
(tuple, encode_array),
(list, encode_array),
(dict, encode_map),
(defaultdict, encode_map),
(OrderedDict, encode_map),
(type(undefined), encode_undefined),
(datetime, encode_datetime),
(date, encode_date),
(type(re.compile('')), encode_regexp),
(('fractions', 'Fraction'), encode_rational),
(('email.message', 'Message'), encode_mime),
(('uuid', 'UUID'), encode_uuid),
(CBORSimpleValue, encode_simple_value),
(CBORTag, encode_semantic)
class CBOREncoder(object):
Serializes objects to a byte stream using Concise Binary Object Representation.
:param datetime_as_timestamp: set to ``True`` to serialize datetimes as UNIX timestamps
(this makes datetimes more concise on the wire but loses the time zone information)
:param datetime.tzinfo timezone: the default timezone to use for serializing naive datetimes
:param value_sharing: if ``True``, allows more efficient serializing of repeated values and,
more importantly, cyclic data structures, at the cost of extra line overhead
:param default: a callable that is called by the encoder with three arguments
(encoder, value, file object) when no suitable encoder has been found, and should use the
methods on the encoder to encode any objects it wants to add to the data stream
__slots__ = ('fp', 'datetime_as_timestamp', 'timezone', 'default', 'value_sharing',
'json_compatible', '_shared_containers', '_encoders')
def __init__(self, fp, datetime_as_timestamp=False, timezone=None, value_sharing=False,
self.fp = fp
self.datetime_as_timestamp = datetime_as_timestamp
self.timezone = timezone
self.value_sharing = value_sharing
self.default = default
self._shared_containers = {} # indexes used for value sharing
self._encoders = default_encoders.copy()
def _find_encoder(self, obj_type):
from sys import modules
for type_, enc in list(iteritems(self._encoders)):
if type(type_) is tuple:
modname, typename = type_
imported_type = getattr(modules.get(modname), typename, None)
if imported_type is not None:
del self._encoders[type_]
self._encoders[imported_type] = enc
type_ = imported_type
else: # pragma: nocover
if issubclass(obj_type, type_):
self._encoders[obj_type] = enc
return enc
return None
def disable_value_sharing(self):
"""Disable value sharing in the encoder for the duration of the context block."""
old_value_sharing = self.value_sharing
self.value_sharing = False
self.value_sharing = old_value_sharing
def write(self, data):
Write bytes to the data stream.
:param data: the bytes to write
def encode(self, obj):
Encode the given object using CBOR.
:param obj: the object to encode
obj_type = obj.__class__
encoder = self._encoders.get(obj_type) or self._find_encoder(obj_type) or self.default
if not encoder:
raise CBOREncodeError('cannot serialize type %s' % obj_type.__name__)
encoder(self, obj)
def encode_to_bytes(self, obj):
Encode the given object to a byte buffer and return its value as bytes.
This method was intended to be used from the ``default`` hook when an object needs to be
encoded separately from the rest but while still taking advantage of the shared value
old_fp = self.fp
self.fp = fp = BytesIO()
self.fp = old_fp
return fp.getvalue()
def dumps(obj, **kwargs):
Serialize an object to a bytestring.
:param obj: the object to serialize
:param kwargs: keyword arguments passed to :class:`~.CBOREncoder`
:return: the serialized output
:rtype: bytes
fp = BytesIO()
dump(obj, fp, **kwargs)
return fp.getvalue()
def dump(obj, fp, **kwargs):
Serialize an object to a file.
:param obj: the object to serialize
:param fp: a file-like object
:param kwargs: keyword arguments passed to :class:`~.CBOREncoder`
CBOREncoder(fp, **kwargs).encode(obj)

@ -0,0 +1,55 @@
class CBORTag(object):
Represents a CBOR semantic tag.
:param int tag: tag number
:param value: encapsulated value (any object)
__slots__ = 'tag', 'value'
def __init__(self, tag, value):
self.tag = tag
self.value = value
def __eq__(self, other):
if isinstance(other, CBORTag):
return self.tag == other.tag and self.value == other.value
return NotImplemented
def __repr__(self):
return 'CBORTag({self.tag}, {self.value!r})'.format(self=self)
class CBORSimpleValue(object):
Represents a CBOR "simple value".
:param int value: the value (0-255)
__slots__ = 'value'
def __init__(self, value):
if value < 0 or value > 255:
raise TypeError('simple value too big')
self.value = value
def __eq__(self, other):
if isinstance(other, CBORSimpleValue):
return self.value == other.value
elif isinstance(other, int):
return self.value == other
return NotImplemented
def __repr__(self):
return 'CBORSimpleValue({self.value})'.format(self=self)
class UndefinedType(object):
__slots__ = ()
#: Represents the "undefined" value.
undefined = UndefinedType()
break_marker = object()

@ -0,0 +1,33 @@
# coding: utf-8
#!/usr/bin/env python
import pkg_resources
extensions = [
templates_path = ['_templates']
source_suffix = '.rst'
master_doc = 'index'
project = 'cbor2'
author = u'Alex Grönholm'
copyright = u'2016, ' + author
v = pkg_resources.get_distribution(project).parsed_version
version = v.base_version
release = v.public
language = None
exclude_patterns = ['_build']
pygments_style = 'sphinx'
highlight_language = 'python'
todo_include_todos = False
html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']
htmlhelp_basename = project.replace('-', '') + 'doc'
intersphinx_mapping = {'python': ('http://docs.python.org/', None)}

@ -0,0 +1,132 @@
Customizing encoding and decoding
Both the encoder and decoder can be customized to support a wider range of types.
On the encoder side, this is accomplished by passing a callback as the ``default`` constructor
argument. This callback will receive an object that the encoder could not serialize on its own.
The callback should then return a value that the encoder can serialize on its own, although the
return value is allowed to contain objects that also require the encoder to use the callback, as
long as it won't result in an infinite loop.
On the decoder side, you have two options: ``tag_hook`` and ``object_hook``. The former is called
by the decoder to process any semantic tags that have no predefined decoders. The latter is called
for any newly decoded ``dict`` objects, and is mostly useful for implementing a JSON compatible
custom type serialization scheme. Unless your requirements restrict you to JSON compatible types
only, it is recommended to use ``tag_hook`` for this purpose.
JSON compatibility
In certain applications, it may be desirable to limit the supported types to the same ones
serializable as JSON: (unicode) string, integer, float, boolean, null, array and object (dict).
This can be done by passing the ``json_compatible`` option to the encoder. When incompatible types
are encountered, a :class:`~cbor2.encoder.CBOREncodeError` is then raised.
For the decoder, there is no support for detecting incoming incompatible types yet.
Using the CBOR tags for custom types
The most common way to use ``default`` is to call :meth:`~cbor2.encoder.CBOREncoder.encode`
to add a custom tag in the data stream, with the payload as the value::
class Point(object):
def __init__(self, x, y):
self.x = x
self.y = y
def default_encoder(encoder, value):
# Tag number 4000 was chosen arbitrarily
encoder.encode(CBORTag(4000, [value.x, value.y]))
The corresponding ``tag_hook`` would be::
def tag_hook(decoder, tag, shareable_index=None):
if tag.tag != 4000:
return tag
# tag.value is now the [x, y] list we serialized before
return Point(*tag.value)
Using dicts to carry custom types
The same could be done with ``object_hook``, except less efficiently::
def default_encoder(encoder, value):
encoder.encode(dict(typename='Point', x=value.x, y=value.y))
def object_hook(decoder, value):
if value.get('typename') != 'Point':
return value
return Point(value['x'], value['y'])
You should make sure that whatever way you decide to use for telling apart your "specially marked"
dicts from arbitrary data dicts won't mistake on for the other.
Value sharing with custom types
In order to properly encode and decode cyclic references with custom types, some special care has
to be taken. Suppose you have a custom type as below, where every child object contains a reference
to its parent and the parent contains a list of children::
from cbor2 import dumps, loads, shareable_encoder, CBORTag
class MyType(object):
def __init__(self, parent=None):
self.parent = parent
self.children = []
if parent:
This would not normally be serializable, as it would lead to an endless loop (in the worst case)
and raise some exception (in the best case). Now, enter CBOR's extension tags 28 and 29. These tags
make it possible to add special markers into the data stream which can be later referenced and
substituted with the object marked earlier.
To do this, in ``default`` hooks used with the encoder you will need to use the
:meth:`~cbor2.encoder.shareable_encoder` decorator on your ``default`` hook function. It will
automatically automatically add the object to the shared values registry on the encoder and prevent
it from being serialized twice (instead writing a reference to the data stream)::
def default_encoder(encoder, value):
# The state has to be serialized separately so that the decoder would have a chance to
# create an empty instance before the shared value references are decoded
serialized_state = encoder.encode_to_bytes(value.__dict__)
encoder.encode(CBORTag(3000, serialized_state))
On the decoder side, you will need to initialize an empty instance for shared value lookup before
the object's state (which may contain references to it) is decoded.
This is done with the :meth:`~cbor2.encoder.CBORDecoder.set_shareable` method::
def tag_hook(decoder, tag, shareable_index=None):
# Return all other tags as-is
if tag.tag != 3000:
return tag
# Create a raw instance before initializing its state to make it possible for cyclic
# references to work
instance = MyType.__new__(MyType)
decoder.set_shareable(shareable_index, instance)
# Separately decode the state of the new object and then apply it
state = decoder.decode_from_bytes(tag.value)
return instance
You could then verify that the cyclic references have been restored after deserialization::
parent = MyType()
child1 = MyType(parent)
child2 = MyType(parent)
serialized = dumps(parent, default=default_encoder, value_sharing=True)
new_parent = loads(serialized, tag_hook=tag_hook)
assert new_parent.children[0].parent is new_parent
assert new_parent.children[1].parent is new_parent

@ -0,0 +1,15 @@
.. include:: ../README.rst
:start-line: 7
:end-before: Project links
Table of contents
.. toctree::
:maxdepth: 2
* :ref:`API reference <modindex>`

@ -0,0 +1,5 @@
.. automodule:: cbor2.decoder

@ -0,0 +1,5 @@
.. automodule:: cbor2.encoder

@ -0,0 +1,5 @@
.. automodule:: cbor2.types

View File

@ -0,0 +1,80 @@
Basic usage
Serializing and deserializing with cbor2 is pretty straightforward::
from cbor2 import dumps, loads
# Serialize an object as a bytestring
data = dumps(['hello', 'world'])
# Deserialize a bytestring
obj = loads(data)
# Efficiently deserialize from a file
with open('input.cbor', 'rb') as fp:
obj = load(fp)
# Efficiently serialize an object to a file
with open('output.cbor', 'wb') as fp:
dump(obj, fp)
Some data types, however, require extra considerations, as detailed below.
String/bytes handling on Python 2
The ``str`` type is encoded as binary on Python 2. If you want to encode strings as text on
Python 2, use unicode strings instead.
Date/time handling
The CBOR specification does not support naïve datetimes (that is, datetimes where ``tzinfo`` is
missing). When the encoder encounters such a datetime, it needs to know which timezone it belongs
to. To this end, you can specify a default timezone by passing a :class:`~datetime.tzinfo` instance
to :func:`~cbor2.encoder.dump`/:func:`~cbor2.encoder.dumps` call as the ``timezone`` argument.
Decoded datetimes are always timezone aware.
By default, datetimes are serialized in a manner that retains their timezone offsets. You can
optimize the data stream size by passing ``datetime_as_timestamp=False`` to
:func:`~cbor2.encoder.dump`/:func:`~cbor2.encoder.dumps`, but this causes the timezone offset
information to be lost.
Cyclic (recursive) data structures
If the encoder encounters a shareable object (ie. list or dict) that it has been before, it will
by default raise :exc:`~cbor2.encoder.CBOREncodeError` indicating that a cyclic reference has been
detected and value sharing was not enabled. CBOR has, however, an extension specification that
allows the encoder to reference a previously encoded value without processing it again. This makes
it possible to serialize such cyclic references, but value sharing has to be enabled by passing
``value_sharing=True`` to :func:`~cbor2.encoder.dump`/:func:`~cbor2.encoder.dumps`.
.. warning:: Support for value sharing is rare in other CBOR implementations, so think carefully
whether you want to enable it. It also causes some line overhead, as all potentially shareable
values must be tagged as such.
Tag support
In addition to all standard CBOR tags, this library supports many extended tags:
=== ======================================== ====================================================
Tag Semantics Python type(s)
=== ======================================== ====================================================
0 Standard date/time string datetime.date / datetime.datetime
1 Epoch-based date/time datetime.date / datetime.datetime
2 Positive bignum int / long
3 Negative bignum int / long
4 Decimal fraction decimal.Decimal
5 Bigfloat decimal.Decimal
28 Mark shared value N/A
29 Reference shared value N/A
30 Rational number fractions.Fraction
35 Regular expression ``_sre.SRE_Pattern`` (result of ``re.compile(...)``)
36 MIME message email.message.Message
37 Binary UUID uuid.UUID
=== ======================================== ====================================================
Arbitary tags can be represented with the :class:`~cbor2.types.CBORTag` class.

@ -0,0 +1,73 @@
Version history
This library adheres to `Semantic Versioning <http://semver.org/>`_.
**4.0.1.** (2017-08-21)
- Fixed silent truncation of decoded data if there are not enough bytes in the stream for an exact
read (``CBORDecodeError`` is now raised instead)
**4.0.0** (2017-04-24)
- **BACKWARD INCOMPATIBLE** Value sharing has been disabled by default, for better compatibility
with other implementations and better performance (since it is rarely needed)
- **BACKWARD INCOMPATIBLE** Replaced the ``semantic_decoders`` decoder option with the ``tag_hook``
- **BACKWARD INCOMPATIBLE** Replaced the ``encoders`` encoder option with the ``default`` option
- **BACKWARD INCOMPATIBLE** Factored out the file object argument (``fp``) from all callbacks
- **BACKWARD INCOMPATIBLE** The encoder no longer supports every imaginable type implementing the
``Sequence`` or ``Map`` interface, as they turned out to be too broad
- Added the ``object_hook`` option for decoding dicts into complex objects
(intended for situations where JSON compatibility is required and semantic tags cannot be used)
- Added encoding and decoding of simple values (``CBORSimpleValue``)
(contributed by Jerry Lundström)
- Replaced the decoder for bignums with a simpler and faster version (contributed by orent)
- Made all relevant classes and functions available directly in the ``cbor2`` namespace
- Added proper documentation
**3.0.4** (2016-09-24)
- Fixed TypeError when trying to encode extension types (regression introduced in 3.0.3)
**3.0.3** (2016-09-23)
- No changes, just re-releasing due to git tagging screw-up
**3.0.2** (2016-09-23)
- Fixed decoding failure for datetimes with microseconds (tag 0)
**3.0.1** (2016-08-08)
- Fixed error in the cyclic structure detection code that could mistake one container for
another, sometimes causing a bogus error about cyclic data structures where there was none
**3.0.0** (2016-07-03)
- **BACKWARD INCOMPATIBLE** Encoder callbacks now receive three arguments: the encoder instance,
the value to encode and a file-like object. The callback must must now either write directly to
the file-like object or call another encoder callback instead of returning an iterable.
- **BACKWARD INCOMPATIBLE** Semantic decoder callbacks now receive four arguments: the decoder
instance, the primitive value, a file-like object and the shareable index for the decoded value.
Decoders that support value sharing must now set the raw value at the given index in
- **BACKWARD INCOMPATIBLE** Removed support for iterative encoding (``CBOREncoder.encode()`` is no
longer a generator function and always returns ``None``)
- Significantly improved performance (encoder ~30 % faster, decoder ~60 % faster)
- Fixed serialization round-trip for ``undefined`` (simple type #23)
- Added proper support for value sharing in callbacks
**2.0.0** (2016-06-11)
- **BACKWARD INCOMPATIBLE** Deserialize unknown tags as ``CBORTag`` objects so as not to lose
- Fixed error messages coming from nested structures
**1.1.0** (2016-06-10)
- Fixed deserialization of cyclic structures
**1.0.0** (2016-06-08)
- Initial release

@ -0,0 +1,21 @@
addopts = -rsx --cov --tb=short
testpaths = tests
source = cbor2
show_missing = true
max-line-length = 99
exclude = .tox,build,docs
universal = 1
tag_build =
tag_date = 0

@ -0,0 +1,43 @@
# coding: utf-8
import os.path
from setuptools import setup, find_packages
here = os.path.dirname(__file__)
readme_path = os.path.join(here, 'README.rst')
readme = open(readme_path).read()
'version_scheme': 'post-release',
'local_scheme': 'dirty-tag'
description='Pure Python CBOR (de)serializer with extensive tag support',
author=u'Alex Grönholm',
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6'
keywords='serialization cbor',
'testing': ['pytest', 'pytest-cov']

@ -0,0 +1,327 @@
from __future__ import division
import math
import re
from binascii import unhexlify
from datetime import datetime, timedelta
from decimal import Decimal
from email.message import Message
from fractions import Fraction
from io import BytesIO
from uuid import UUID
import pytest
from cbor2.compat import timezone
from cbor2.decoder import loads, CBORDecodeError, load, CBORDecoder
from cbor2.types import CBORTag, undefined, CBORSimpleValue
@pytest.mark.parametrize('payload, expected', [
('00', 0),
('01', 1),
('0a', 10),
('17', 23),
('1818', 24),
('1819', 25),
('1864', 100),
('1903e8', 1000),
('1a000f4240', 1000000),
('1b000000e8d4a51000', 1000000000000),
('1bffffffffffffffff', 18446744073709551615),
('c249010000000000000000', 18446744073709551616),
('3bffffffffffffffff', -18446744073709551616),
('c349010000000000000000', -18446744073709551617),
('20', -1),
('29', -10),
('3863', -100),
('3903e7', -1000)
def test_integer(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
def test_invalid_integer_subtype():
exc = pytest.raises(CBORDecodeError, loads, b'\x1c')
assert str(exc.value).endswith('unknown unsigned integer subtype 0x1c')
@pytest.mark.parametrize('payload, expected', [
('f90000', 0.0),
('f98000', -0.0),
('f93c00', 1.0),
('fb3ff199999999999a', 1.1),
('f93e00', 1.5),
('f97bff', 65504.0),
('fa47c35000', 100000.0),
('fa7f7fffff', 3.4028234663852886e+38),
('fb7e37e43c8800759c', 1.0e+300),
('f90001', 5.960464477539063e-8),
('f90400', 0.00006103515625),
('f9c400', -4.0),
('fbc010666666666666', -4.1),
('f97c00', float('inf')),
('f9fc00', float('-inf')),
('fa7f800000', float('inf')),
('faff800000', float('-inf')),
('fb7ff0000000000000', float('inf')),
('fbfff0000000000000', float('-inf'))
def test_float(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload', ['f97e00', 'fa7fc00000', 'fb7ff8000000000000'])
def test_float_nan(payload):
decoded = loads(unhexlify(payload))
assert math.isnan(decoded)
@pytest.mark.parametrize('payload, expected', [
('f4', False),
('f5', True),
('f6', None),
('f7', undefined)
def test_special(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded is expected
@pytest.mark.parametrize('payload, expected', [
('40', b''),
('4401020304', b'\x01\x02\x03\x04'),
def test_binary(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload, expected', [
('60', u''),
('6161', u'a'),
('6449455446', u'IETF'),
('62225c', u'\"\\'),
('62c3bc', u'\u00fc'),
('63e6b0b4', u'\u6c34')
def test_string(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload, expected', [
('80', []),
('83010203', [1, 2, 3]),
('8301820203820405', [1, [2, 3], [4, 5]]),
('98190102030405060708090a0b0c0d0e0f101112131415161718181819', list(range(1, 26)))
def test_array(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload, expected', [
('a0', {}),
('a201020304', {1: 2, 3: 4})
def test_map(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload, expected', [
('a26161016162820203', {'a': 1, 'b': [2, 3]}),
('826161a161626163', ['a', {'b': 'c'}]),
{'a': 'A', 'b': 'B', 'c': 'C', 'd': 'D', 'e': 'E'})
def test_mixed_array_map(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload, expected', [
('5f42010243030405ff', b'\x01\x02\x03\x04\x05'),
('7f657374726561646d696e67ff', 'streaming'),
('9fff', []),
('9f018202039f0405ffff', [1, [2, 3], [4, 5]]),
('9f01820203820405ff', [1, [2, 3], [4, 5]]),
('83018202039f0405ff', [1, [2, 3], [4, 5]]),
('83019f0203ff820405', [1, [2, 3], [4, 5]]),
('9f0102030405060708090a0b0c0d0e0f101112131415161718181819ff', list(range(1, 26))),
('bf61610161629f0203ffff', {'a': 1, 'b': [2, 3]}),
('826161bf61626163ff', ['a', {'b': 'c'}]),
('bf6346756ef563416d7421ff', {'Fun': True, 'Amt': -2}),
def test_streaming(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
@pytest.mark.parametrize('payload, expected', [
('e0', 0),
('e2', 2),
('f3', 19),
('f820', 32),
('e0', CBORSimpleValue(0)),
('e2', CBORSimpleValue(2)),
('f3', CBORSimpleValue(19)),
('f820', CBORSimpleValue(32))
def test_simple_value(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
# Tests for extension tags
@pytest.mark.parametrize('payload, expected', [
datetime(2013, 3, 21, 20, 4, 0, tzinfo=timezone.utc)),
datetime(2013, 3, 21, 20, 4, 0, 380841, tzinfo=timezone.utc)),
datetime(2013, 3, 21, 22, 4, 0, tzinfo=timezone(timedelta(hours=2)))),
('c11a514b67b0', datetime(2013, 3, 21, 20, 4, 0, tzinfo=timezone.utc)),
('c11a514b67b0', datetime(2013, 3, 21, 22, 4, 0, tzinfo=timezone(timedelta(hours=2))))
], ids=['datetime/utc', 'datetime+micro/utc', 'datetime/eet', 'timestamp/utc', 'timestamp/eet'])
def test_datetime(payload, expected):
decoded = loads(unhexlify(payload))
assert decoded == expected
def test_bad_datetime():
exc = pytest.raises(CBORDecodeError, loads, unhexlify('c06b303030302d3132332d3031'))
assert str(exc.value).endswith('invalid datetime string: 0000-123-01')
def test_fraction():
decoded = loads(unhexlify('c48221196ab3'))
assert decoded == Decimal('273.15')
def test_bigfloat():
decoded = loads(unhexlify('c5822003'))
assert decoded == Decimal('1.5')
def test_rational():
decoded = loads(unhexlify('d81e820205'))
assert decoded == Fraction(2, 5)
def test_regex():
decoded = loads(unhexlify('d8236d68656c6c6f2028776f726c6429'))
expr = re.compile(u'hello (world)')
assert decoded == expr
def test_mime():
decoded = loads(unhexlify(
assert isinstance(decoded, Message)
assert decoded.get_payload() == 'Hello =A4uro'
def test_uuid():
decoded = loads(unhexlify('d825505eaffac8b51e480581277fdcc7842faf'))
assert decoded == UUID(hex='5eaffac8b51e480581277fdcc7842faf')
def test_bad_shared_reference():
exc = pytest.raises(CBORDecodeError, loads, unhexlify('d81d05'))
assert str(exc.value).endswith('shared reference 5 not found')
def test_uninitialized_shared_reference():
fp = BytesIO(unhexlify('d81d00'))
decoder = CBORDecoder(fp)
exc = pytest.raises(CBORDecodeError, decoder.decode)
assert str(exc.value).endswith('shared value 0 has not been initialized')
def test_cyclic_array():
decoded = loads(unhexlify('d81c81d81d00'))
assert decoded == [decoded]
def test_cyclic_map():
decoded = loads(unhexlify('d81ca100d81d00'))
assert decoded == {0: decoded}
def test_unhandled_tag():
Test that a tag is simply ignored and its associated value returned if there is no special
handling available for it.
decoded = loads(unhexlify('d917706548656c6c6f'))
assert decoded == CBORTag(6000, u'Hello')
def test_premature_end_of_stream():
Test that the decoder detects a situation where read() returned fewer than expected bytes.
exc = pytest.raises(CBORDecodeError, loads, unhexlify('437879'))
exc.match('premature end of stream \(expected to read 3 bytes, got 2 instead\)')
def test_tag_hook():
def reverse(decoder, tag, fp, shareable_index=None):
return tag.value[::-1]
decoded = loads(unhexlify('d917706548656c6c6f'), tag_hook=reverse)
assert decoded == u'olleH'
def test_tag_hook_cyclic():
class DummyType(object):
def __init__(self, value):
self.value = value
def unmarshal_dummy(decoder, tag, shareable_index=None):
instance = DummyType.__new__(DummyType)
decoder.set_shareable(shareable_index, instance)
instance.value = decoder.decode_from_bytes(tag.value)
return instance
decoded = loads(unhexlify('D81CD90BB849D81CD90BB843D81D00'), tag_hook=unmarshal_dummy)
assert isinstance(decoded, DummyType)
assert decoded.value.value is decoded
def test_object_hook():
class DummyType(object):
def __init__(self, state):
self.state = state
payload = unhexlify('A2616103616205')
decoded = loads(payload, object_hook=lambda decoder, value: DummyType(value))
assert isinstance(decoded, DummyType)
assert decoded.state == {'a': 3, 'b': 5}
def test_error_major_type():
exc = pytest.raises(CBORDecodeError, loads, b'')
assert str(exc.value).startswith('error reading major type at index 0: ')
def test_load_from_file(tmpdir):
path = tmpdir.join('testdata.cbor')
with path.open('rb') as fp:
obj = load(fp)
assert obj == [1, 10]

@ -0,0 +1,260 @@
import re
from binascii import unhexlify
from datetime import datetime, timedelta, date
from decimal import Decimal
from email.mime.text import MIMEText
from fractions import Fraction
from uuid import UUID
import pytest
from cbor2.compat import timezone
from cbor2.encoder import dumps, CBOREncodeError, dump, shareable_encoder
from cbor2.types import CBORTag, undefined, CBORSimpleValue
@pytest.mark.parametrize('value, expected', [
(0, '00'),
(1, '01'),
(10, '0a'),
(23, '17'),
(24, '1818'),
(100, '1864'),
(1000, '1903e8'),
(1000000, '1a000f4240'),
(1000000000000, '1b000000e8d4a51000'),
(18446744073709551615, '1bffffffffffffffff'),
(18446744073709551616, 'c249010000000000000000'),
(-18446744073709551616, '3bffffffffffffffff'),
(-18446744073709551617, 'c349010000000000000000'),
(-1, '20'),
(-10, '29'),
(-100, '3863'),
(-1000, '3903e7')
def test_integer(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
@pytest.mark.parametrize('value, expected', [
(1.1, 'fb3ff199999999999a'),
(1.0e+300, 'fb7e37e43c8800759c'),
(-4.1, 'fbc010666666666666'),
(float('inf'), 'f97c00'),
(float('nan'), 'f97e00'),
(float('-inf'), 'f9fc00')
def test_float(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
@pytest.mark.parametrize('value, expected', [
(b'', '40'),
(b'\x01\x02\x03\x04', '4401020304'),
def test_bytestring(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
def test_bytearray():
expected = unhexlify('4401020304')
assert dumps(bytearray(b'\x01\x02\x03\x04')) == expected
@pytest.mark.parametrize('value, expected', [
(u'', '60'),
(u'a', '6161'),
(u'IETF', '6449455446'),
(u'"\\', '62225c'),
(u'\u00fc', '62c3bc'),
(u'\u6c34', '63e6b0b4')
def test_string(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
@pytest.mark.parametrize('value, expected', [
(False, 'f4'),
(True, 'f5'),
(None, 'f6'),
(undefined, 'f7')
], ids=['false', 'true', 'null', 'undefined'])
def test_special(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
@pytest.mark.parametrize('value, expected', [
(CBORSimpleValue(0), 'e0'),
(CBORSimpleValue(2), 'e2'),
(CBORSimpleValue(19), 'f3'),
(CBORSimpleValue(32), 'f820')
def test_simple_value(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
# Tests for extension tags
@pytest.mark.parametrize('value, as_timestamp, expected', [
(datetime(2013, 3, 21, 20, 4, 0, tzinfo=timezone.utc), False,
(datetime(2013, 3, 21, 20, 4, 0, 380841, tzinfo=timezone.utc), False,
(datetime(2013, 3, 21, 22, 4, 0, tzinfo=timezone(timedelta(hours=2))), False,
(datetime(2013, 3, 21, 20, 4, 0), False, 'c074323031332d30332d32315432303a30343a30305a'),
(datetime(2013, 3, 21, 20, 4, 0, tzinfo=timezone.utc), True, 'c11a514b67b0'),
(datetime(2013, 3, 21, 22, 4, 0, tzinfo=timezone(timedelta(hours=2))), True, 'c11a514b67b0')
], ids=['datetime/utc', 'datetime+micro/utc', 'datetime/eet', 'naive', 'timestamp/utc',
def test_datetime(value, as_timestamp, expected):
expected = unhexlify(expected)
assert dumps(value, datetime_as_timestamp=as_timestamp, timezone=timezone.utc) == expected
def test_date():
expected = unhexlify('c074323031332d30332d32315430303a30303a30305a')
assert dumps(date(2013, 3, 21), timezone=timezone.utc) == expected
def test_naive_datetime():
"""Test that naive datetimes are gracefully rejected when no timezone has been set."""
exc = pytest.raises(CBOREncodeError, dumps, datetime(2013, 3, 21))
exc.match('naive datetime encountered and no default timezone has been set')
@pytest.mark.parametrize('value, expected', [
(Decimal('14.123'), 'c4822219372b'),
(Decimal('NaN'), 'f97e00'),
(Decimal('Infinity'), 'f97c00'),
(Decimal('-Infinity'), 'f9fc00')
], ids=['normal', 'nan', 'inf', 'neginf'])
def test_decimal(value, expected):
expected = unhexlify(expected)
assert dumps(value) == expected
def test_rational():
expected = unhexlify('d81e820205')
assert dumps(Fraction(2, 5)) == expected
def test_regex():
expected = unhexlify('d8236d68656c6c6f2028776f726c6429')
assert dumps(re.compile(u'hello (world)')) == expected
def test_mime():
expected = unhexlify(
message = MIMEText(u'Hello \u20acuro', 'plain', 'iso-8859-15')
assert dumps(message) == expected
def test_uuid():
expected = unhexlify('d825505eaffac8b51e480581277fdcc7842faf')
assert dumps(UUID(hex='5eaffac8b51e480581277fdcc7842faf')) == expected
def test_custom_tag():
expected = unhexlify('d917706548656c6c6f')
assert dumps(CBORTag(6000, u'Hello')) == expected
def test_cyclic_array():
"""Test that an array that contains itself can be serialized with value sharing enabled."""
expected = unhexlify('d81c81d81c81d81d00')
a = [[]]
assert dumps(a, value_sharing=True) == expected
def test_cyclic_array_nosharing():
"""Test that serializing a cyclic structure w/o value sharing will blow up gracefully."""
a = []
exc = pytest.raises(CBOREncodeError, dumps, a)
exc.match('cyclic data structure detected but value sharing is disabled')
def test_cyclic_map():
"""Test that a dict that contains itself can be serialized with value sharing enabled."""
expected = unhexlify('d81ca100d81d00')
a = {}
a[0] = a
assert dumps(a, value_sharing=True) == expected
def test_cyclic_map_nosharing():
"""Test that serializing a cyclic structure w/o value sharing will fail gracefully."""
a = {}
a[0] = a
exc = pytest.raises(CBOREncodeError, dumps, a)
exc.match('cyclic data structure detected but value sharing is disabled')
@pytest.mark.parametrize('value_sharing, expected', [
(False, '828080'),
(True, 'd81c82d81c80d81d01')
], ids=['nosharing', 'sharing'])
def test_not_cyclic_same_object(value_sharing, expected):
"""Test that the same shareable object can be included twice if not in a cyclic structure."""
expected = unhexlify(expected)
a = []
b = [a, a]
assert dumps(b, value_sharing=value_sharing) == expected
def test_unsupported_type():
exc = pytest.raises(CBOREncodeError, dumps, lambda: None)
exc.match('cannot serialize type function')
def test_default():
class DummyType(object):
def __init__(self, state):
self.state = state
def default_encoder(encoder, value):
expected = unhexlify('820305')
obj = DummyType([3, 5])
serialized = dumps(obj, default=default_encoder)
assert serialized == expected
def test_default_cyclic():
class DummyType(object):
def __init__(self, value=None):
self.value = value
def default_encoder(encoder, value):
state = encoder.encode_to_bytes(value.value)
encoder.encode(CBORTag(3000, state))
expected = unhexlify('D81CD90BB849D81CD90BB843D81D00')
obj = DummyType()
obj2 = DummyType(obj)
obj.value = obj2
serialized = dumps(obj, value_sharing=True, default=default_encoder)
assert serialized == expected
def test_dump_to_file(tmpdir):
path = tmpdir.join('testdata.cbor')
with path.open('wb') as fp:
dump([1, 10], fp)
assert path.read_binary() == b'\x82\x01\x0a'

@ -0,0 +1,36 @@
import pytest
from cbor2.types import CBORTag, CBORSimpleValue
def test_tag_repr():
assert repr(CBORTag(600, 'blah')) == "CBORTag(600, 'blah')"
def test_tag_equals():
tag1 = CBORTag(500, ['foo'])
tag2 = CBORTag(500, ['foo'])
tag3 = CBORTag(500, ['bar'])
assert tag1 == tag2
assert not tag1 == tag3
assert not tag1 == 500
def test_simple_value_repr():
assert repr(CBORSimpleValue(1)) == "CBORSimpleValue(1)"
def test_simple_value_equals():
tag1 = CBORSimpleValue(1)
tag2 = CBORSimpleValue(1)
tag3 = CBORSimpleValue(21)
assert tag1 == tag2
assert tag1 == 1
assert not tag1 == tag3
assert not tag1 == 21
assert not tag2 == "21"
def test_simple_value_too_big():
exc = pytest.raises(TypeError, CBORSimpleValue, 256)
assert str(exc.value) == 'simple value too big'

@ -0,0 +1,20 @@
envlist = py27, py33, py34, py35, py36, pypy, flake8
skip_missing_interpreters = true
2.7 = py27
3.3 = py33
3.4 = py34
3.5 = py35
3.6 = py36, flake8
pypy = pypy
commands = python -m pytest {posargs}
extras = testing
deps = flake8
commands = flake8 cbor2 tests
skip_install = true