Bug 1432390 - Directly call the docker API over its unix socket instead of calling docker load. r=dustin

While spawning `docker load` is likely to work on developer machines,
on automation, it requires a docker client that is the exact same
version as the server running on the taskcluster worker for
docker-in-docker, which is not convenient. The API required for `docker
load` is rather simple, though, and can be mimicked quite easily.

While this change in itself is not necessary for developer machines,
it will allow to re-use the same command for the image-builder to
load a parent docker images when deriving one from another. We could
keep a code branch using `docker load` but it seems wasteful to maintain
two branches when one can work for both use cases.

--HG--
extra : rebase_source : d72956d7dd329b92564cbaa3fbfe0687d4d5d994
This commit is contained in:
Mike Hommey 2018-01-24 14:25:09 +09:00
parent 16c5f4c5f4
commit 378f47ff49
2 changed files with 121 additions and 26 deletions

View File

@ -12,7 +12,6 @@ import subprocess
import tarfile
import tempfile
import which
from subprocess import Popen, PIPE
from io import BytesIO
from taskgraph.util import docker
@ -155,9 +154,11 @@ def load_image(url, imageName=None, imageTag=None):
else:
imageTag = 'latest'
docker = None
image, tag, layer = None, None, None
try:
info = {}
def download_and_modify_image():
# This function downloads and edits the downloaded tar file on the fly.
# It emits chunked buffers of the editted tar file, as a generator.
print("Downloading from {}".format(url))
# get_session() gets us a requests.Session set to retry several times.
req = get_session().get(url, stream=True)
@ -168,21 +169,17 @@ def load_image(url, imageName=None, imageTag=None):
fileobj=decompressed_reader,
bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
# Seutp piping: tarout | docker
docker = Popen(['docker', 'load'], stdin=PIPE)
tarout = tarfile.open(mode='w|', fileobj=docker.stdin, format=tarfile.GNU_FORMAT)
# Read from tarin and write to tarout
# Stream through each member of the downloaded tar file individually.
for member in tarin:
# Write non-file members directly (don't use extractfile on links)
# Non-file members only need a tar header. Emit one.
if not member.isfile():
tarout.addfile(member)
yield member.tobuf(tarfile.GNU_FORMAT)
continue
# Open reader for the member
# Open stream reader for the member
reader = tarin.extractfile(member)
# If member is repository, we parse and possibly rewrite the image tags
# If member is `repositories`, we parse and possibly rewrite the image tags
if member.name == 'repositories':
# Read and parse repositories
repos = json.loads(reader.read())
@ -191,29 +188,37 @@ def load_image(url, imageName=None, imageTag=None):
# If there is more than one image or tag, we can't handle it here
if len(repos.keys()) > 1:
raise Exception('file contains more than one image')
image = repos.keys()[0]
info['image'] = image = repos.keys()[0]
if len(repos[image].keys()) > 1:
raise Exception('file contains more than one tag')
tag = repos[image].keys()[0]
layer = repos[image][tag]
info['tag'] = tag = repos[image].keys()[0]
info['layer'] = layer = repos[image][tag]
# Rewrite the repositories file
data = json.dumps({imageName or image: {imageTag or tag: layer}})
reader = BytesIO(data)
member.size = len(data)
# Add member and reader
tarout.addfile(member, reader)
# Emit the tar header for this member.
yield member.tobuf(tarfile.GNU_FORMAT)
# Then emit its content.
remaining = member.size
while remaining:
length = min(remaining, zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
buf = reader.read(length)
remaining -= len(buf)
yield buf
# Pad to fill a 512 bytes block, per tar format.
remainder = member.size % 512
if remainder:
yield '\0' * (512 - remainder)
reader.close()
tarout.close()
finally:
if docker:
docker.stdin.close()
if docker and docker.wait() != 0:
raise Exception('loading into docker failed')
docker.post_to_docker(download_and_modify_image(), '/images/load', quiet=0)
# Check that we found a repositories file
if not image or not tag or not layer:
if not info.get('image') or not info.get('tag') or not info.get('layer'):
raise Exception('No repositories file found!')
return {'image': image, 'tag': tag, 'layer': layer}
return info

View File

@ -5,12 +5,17 @@
from __future__ import absolute_import, print_function, unicode_literals
import hashlib
import json
import os
import re
import requests_unixsocket
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib
import urlparse
import yaml
from mozbuild.util import memoize
@ -24,6 +29,91 @@ from .. import GECKO
IMAGE_DIR = os.path.join(GECKO, 'taskcluster', 'docker')
def docker_url(path, **kwargs):
docker_socket = os.environ.get('DOCKER_SOCKET', '/var/run/docker.sock')
return urlparse.urlunparse((
'http+unix',
urllib.quote(docker_socket, safe=''),
path,
'',
urllib.urlencode(kwargs),
''))
def post_to_docker(tar, api_path, **kwargs):
"""POSTs a tar file to a given docker API path.
The tar argument can be anything that can be passed to requests.post()
as data (e.g. iterator or file object).
The extra keyword arguments are passed as arguments to the docker API.
"""
req = requests_unixsocket.Session().post(
docker_url(api_path, **kwargs),
data=tar,
stream=True,
headers={'Content-Type': 'application/x-tar'},
)
if req.status_code != 200:
message = req.json().get('message')
if not message:
message = 'docker API returned HTTP code {}'.format(
req.status_code)
raise Exception(message)
status_line = {}
buf = b''
for content in req.iter_content(chunk_size=None):
if not content:
continue
# Sometimes, a chunk of content is not a complete json, so we cumulate
# with leftovers from previous iterations.
buf += content
try:
data = json.loads(buf)
except Exception:
continue
buf = b''
# data is sometimes an empty dict.
if not data:
continue
# Mimick how docker itself presents the output. This code was tested
# with API version 1.18 and 1.26.
if 'status' in data:
if 'id' in data:
if sys.stderr.isatty():
total_lines = len(status_line)
line = status_line.setdefault(data['id'], total_lines)
n = total_lines - line
if n > 0:
# Move the cursor up n lines.
sys.stderr.write('\033[{}A'.format(n))
# Clear line and move the cursor to the beginning of it.
sys.stderr.write('\033[2K\r')
sys.stderr.write('{}: {} {}\n'.format(
data['id'], data['status'], data.get('progress', '')))
if n > 1:
# Move the cursor down n - 1 lines, which, considering
# the carriage return on the last write, gets us back
# where we started.
sys.stderr.write('\033[{}B'.format(n - 1))
else:
status = status_line.get(data['id'])
# Only print status changes.
if status != data['status']:
sys.stderr.write('{}: {}\n'.format(data['id'], data['status']))
status_line[data['id']] = data['status']
else:
status_line = {}
sys.stderr.write('{}\n'.format(data['status']))
elif 'stream' in data:
sys.stderr.write(data['stream'])
elif 'error' in data:
raise Exception(data['error'])
else:
raise NotImplementedError(repr(data))
sys.stderr.flush()
def docker_image(name, by_tag=False):
'''
Resolve in-tree prebuilt docker image to ``<registry>/<repository>@sha256:<digest>``,