diff --git a/taskcluster/taskgraph/docker.py b/taskcluster/taskgraph/docker.py index 22480b864357..9511afb0c958 100644 --- a/taskcluster/taskgraph/docker.py +++ b/taskcluster/taskgraph/docker.py @@ -93,29 +93,6 @@ def build_image(name, tag, args=None): print('*' * 50) -# The zstandard library doesn't expose a file-like interface for its -# decompressor, but an iterator. Support for a file-like interface is due in -# next release. In the meanwhile, we use this proxy class to turn the iterator -# into a file-like. -class IteratorReader(object): - def __init__(self, iterator): - self._iterator = iterator - self._buf = b'' - - def read(self, size): - result = b'' - while len(result) < size: - wanted = min(size - len(result), len(self._buf)) - if not self._buf: - try: - self._buf = memoryview(next(self._iterator)) - except StopIteration: - break - result += self._buf[:wanted].tobytes() - self._buf = self._buf[wanted:] - return result - - def load_image(url, imageName=None, imageTag=None): """ Load docker image from URL as imageName:tag, if no imageName or tag is given @@ -143,58 +120,63 @@ def load_image(url, imageName=None, imageTag=None): # get_session() gets us a requests.Session set to retry several times. req = get_session().get(url, stream=True) req.raise_for_status() - decompressed_reader = IteratorReader(zstd.ZstdDecompressor().read_from(req.raw)) - tarin = tarfile.open( - mode='r|', - fileobj=decompressed_reader, - bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) - # Stream through each member of the downloaded tar file individually. - for member in tarin: - # Non-file members only need a tar header. Emit one. - if not member.isfile(): + with zstd.ZstdDecompressor().stream_reader(req.raw) as ifh: + + tarin = tarfile.open( + mode='r|', + fileobj=ifh, + bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) + + # Stream through each member of the downloaded tar file individually. + for member in tarin: + # Non-file members only need a tar header. Emit one. + if not member.isfile(): + yield member.tobuf(tarfile.GNU_FORMAT) + continue + + # Open stream reader for the member + reader = tarin.extractfile(member) + + # If member is `repositories`, we parse and possibly rewrite the + # image tags. + if member.name == 'repositories': + # Read and parse repositories + repos = json.loads(reader.read()) + reader.close() + + # If there is more than one image or tag, we can't handle it + # here. + if len(repos.keys()) > 1: + raise Exception('file contains more than one image') + info['image'] = image = repos.keys()[0] + if len(repos[image].keys()) > 1: + raise Exception('file contains more than one tag') + info['tag'] = tag = repos[image].keys()[0] + info['layer'] = layer = repos[image][tag] + + # Rewrite the repositories file + data = json.dumps({imageName or image: {imageTag or tag: layer}}) + reader = BytesIO(data) + member.size = len(data) + + # Emit the tar header for this member. yield member.tobuf(tarfile.GNU_FORMAT) - continue + # Then emit its content. + remaining = member.size + while remaining: + length = min(remaining, + zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) + buf = reader.read(length) + remaining -= len(buf) + yield buf + # Pad to fill a 512 bytes block, per tar format. + remainder = member.size % 512 + if remainder: + yield '\0' * (512 - remainder) - # Open stream reader for the member - reader = tarin.extractfile(member) - - # If member is `repositories`, we parse and possibly rewrite the image tags - if member.name == 'repositories': - # Read and parse repositories - repos = json.loads(reader.read()) reader.close() - # If there is more than one image or tag, we can't handle it here - if len(repos.keys()) > 1: - raise Exception('file contains more than one image') - info['image'] = image = repos.keys()[0] - if len(repos[image].keys()) > 1: - raise Exception('file contains more than one tag') - info['tag'] = tag = repos[image].keys()[0] - info['layer'] = layer = repos[image][tag] - - # Rewrite the repositories file - data = json.dumps({imageName or image: {imageTag or tag: layer}}) - reader = BytesIO(data) - member.size = len(data) - - # Emit the tar header for this member. - yield member.tobuf(tarfile.GNU_FORMAT) - # Then emit its content. - remaining = member.size - while remaining: - length = min(remaining, zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE) - buf = reader.read(length) - remaining -= len(buf) - yield buf - # Pad to fill a 512 bytes block, per tar format. - remainder = member.size % 512 - if remainder: - yield '\0' * (512 - remainder) - - reader.close() - docker.post_to_docker(download_and_modify_image(), '/images/load', quiet=0) # Check that we found a repositories file