[ARVADOS] updated: 0ae2871e43fdfad53246a3ce3f25d2ad8e17aa32
git at public.curoverse.com
git at public.curoverse.com
Fri Apr 3 14:52:38 EDT 2015
Summary of changes:
sdk/python/arvados/commands/cwl_job.py | 64 ++++++----------
sdk/python/arvados/commands/keepdocker.py | 122 +++++++++++++++++++++++++-----
sdk/python/arvados/keep.py | 2 +-
3 files changed, 127 insertions(+), 61 deletions(-)
via 0ae2871e43fdfad53246a3ce3f25d2ad8e17aa32 (commit)
from e4ea50a9fb2b1f44352d3adaee5df209c2a72c96 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0ae2871e43fdfad53246a3ce3f25d2ad8e17aa32
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 3 14:56:10 2015 -0400
4685: Add arv-keepdocker --download, used by task bootstrapping scripts.
diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
index ae94eea..73e5ca7 100644
--- a/sdk/python/arvados/commands/cwl_job.py
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -42,6 +42,19 @@ def make_slots(nodes):
slots["%s[%i]" % (n, c)] = {"node": n, "slot": c, "task": None}
return slots
+script_header = """
+set -e
+set -v
+
+read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
+trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" INT QUIT TERM
+
+arv-keepdocker --download $docker_hash
+
+rm -rf $tmpdir
+mkdir -p $tmpdir
+"""
+
def determine_resources(slurm_jobid=None, slurm_nodelist=None):
have_slurm = (slurm_jobid is not None) and (slurm_nodelist is not None)
@@ -63,19 +76,8 @@ def determine_resources(slurm_jobid=None, slurm_nodelist=None):
"slots": slots}
def run_on_slot(resources, slot, task, task_uuid):
- execution_script = Template("""
-set -e
-set -v
-
-read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
-trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" EXIT INT QUIT TERM
-
-if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
- arv-get $docker_locator/$docker_hash.tar | docker load
-fi
-
-rm -rf $tmpdir
-mkdir -p $tmpdir/job_output $tmpdir/keep
+ execution_script = Template(script_header + """
+mkdir $tmpdir/job_output $tmpdir/keep
if which crunchstat ; then
CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
@@ -129,7 +131,6 @@ exit $$code
stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdout"]))
ex = execution_script.substitute(docker_hash=pipes.quote(task["docker_hash"]),
- docker_locator=pipes.quote(task["docker_locator"]),
tmpdir=pipes.quote(tmpdir),
env=env,
cmd=" ".join([pipes.quote(c) for c in task["command"]]),
@@ -198,20 +199,7 @@ class TaskEvents(object):
self.finish_task(ev["properties"]["new_attributes"])
def run_executive(api, job, api_config):
- execution_script = Template("""
-set -e
-set -v
-
-read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
-trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" EXIT INT QUIT TERM
-
-if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
- arv-get $docker_locator/$docker_hash.tar | docker load
-fi
-
-rm -rf $tmpdir
-mkdir -p $tmpdir
-
+ execution_script = Template(script_header + """
cd $tmpdir
git init
git config --local credential.$githttp/.helper '!tok(){ echo password=$ARVADOS_API_TOKEN; };tok'
@@ -233,31 +221,25 @@ docker run \
""")
tmpdir = "/tmp/%s-%i" % (job["uuid"], random.randint(1, 100000))
- cr = arvados.CollectionReader(job["docker_image_locator"], api_client=api)
-
- if len(cr) != 1:
- raise arvados.errors.ArgumentError("docker_image_locator must only contain a single file")
- docker_image = re.match("([0-9a-f]{40})\.tar", cr.keys()[0])
- if docker_image:
- docker_hash = docker_image.group(1)
- else:
- raise arvados.errors.ArgumentError("docker_image_locator must contain a docker image")
+ docker_hash = image_hash_in_collection(arvados.CollectionReader(job["docker_image_locator"], api_client=api))
- ex = execution_script.substitute(docker_hash=docker_hash,
- docker_locator=job["docker_image_locator"],
+ ex = execution_script.substitute(docker_hash=pipes.quote(docker_hash),
+ docker_locator=pipes.quote(job["docker_image_locator"]),
tmpdir=tmpdir,
ARVADOS_API_HOST=pipes.quote(api_config["ARVADOS_API_HOST"]),
ARVADOS_API_TOKEN=pipes.quote(api_config["ARVADOS_API_TOKEN"]),
ARVADOS_API_HOST_INSECURE=pipes.quote(api_config.get("ARVADOS_API_TOKEN", "0")),
TASK_CANCELED=TASK_CANCELED,
githttp=pipes.quote(api._rootDesc.get("gitHttpBase")),
- gitrepo=pipes.quote(job["repository"]))
+ gitrepo=pipes.quote(job["repository"]),
+ script=pipes.quote(job["script"]),
+ script_version=pipes.quote(job["script_version"]))
if resources["have_slurm"]:
pass
else:
- subprocess.Popen(ex, shell=True)
+ return subprocess.Popen(ex, shell=True)
class SigHandler(object):
def __init__(self, ts):
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index 933fd77..1b01d48 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -5,6 +5,7 @@ import datetime
import errno
import json
import os
+import re
import subprocess
import sys
import tarfile
@@ -15,8 +16,10 @@ from collections import namedtuple
from stat import *
import arvados
+from arvados.util import list_all
import arvados.commands._util as arv_cmd
import arvados.commands.put as arv_put
+import arvados.errors
STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
@@ -28,6 +31,10 @@ keepdocker_parser.add_argument(
'-f', '--force', action='store_true', default=False,
help="Re-upload the image even if it already exists on the server")
+keepdocker_parser.add_argument(
+ '--no-trunc', action='store_true', default=False,
+ help="Don't truncate Docker image hashes in output.")
+
_group = keepdocker_parser.add_mutually_exclusive_group()
_group.add_argument(
'--pull', action='store_true', default=False,
@@ -36,9 +43,17 @@ _group.add_argument(
'--no-pull', action='store_false', dest='pull',
help="Use locally installed image only, don't pull image from Docker registry (default)")
+_group = keepdocker_parser.add_mutually_exclusive_group()
+_group.add_argument(
+ '--download', action='store_true', default=False,
+ help="Fetch Docker image from Arvados and load locally.")
+_group.add_argument(
+ '--upload', action='store_true', default=False,
+ help="Upload local Docker image to Arvados (default)")
+
keepdocker_parser.add_argument(
'image', nargs='?',
- help="Docker image to upload, as a repository name or hash")
+ help="Docker image as a repository name or hash")
keepdocker_parser.add_argument(
'tag', nargs='?', default='latest',
help="Tag of the Docker image to upload (default 'latest')")
@@ -47,7 +62,7 @@ keepdocker_parser.add_argument(
# The options inherited from arv-put include --name, --project-uuid,
# --progress/--no-progress/--batch-progress and --resume/--no-resume.
arg_parser = argparse.ArgumentParser(
- description="Upload or list Docker images in Arvados",
+ description="Upload, download or list Docker images in Arvados",
parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
class DockerError(Exception):
@@ -165,7 +180,7 @@ def ptimestamp(t):
t = s[0] + s[1][-1:]
return datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%SZ")
-def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
+def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, image_hash=None, image_collection=None):
"""List all Docker images known to the api_client with image_name and
image_tag. If no image_name is given, defaults to listing all
Docker images.
@@ -181,10 +196,12 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
if image_name:
image_link_name = "{}:{}".format(image_name, image_tag or 'latest')
docker_image_filters.append(['name', '=', image_link_name])
+ if image_hash:
+ docker_image_filters.append(['name', '=', image_hash])
+ if image_collection:
+ docker_image_filters.append(['head_uuid', '=', image_collection])
- existing_links = api_client.links().list(
- filters=docker_image_filters
- ).execute(num_retries=num_retries)['items']
+ existing_links = list_all(api_client.links().list, num_retries, filters=docker_image_filters)
images = {}
for link in existing_links:
collection_uuid = link["head_uuid"]
@@ -210,18 +227,88 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
return sorted(images.items(), lambda a, b: cmp(b[1]["timestamp"], a[1]["timestamp"]))
+def image_hash_in_collection(cr):
+ if len(cr) != 1:
+ raise arvados.errors.ArgumentError("docker_image_locator must only contain a single file")
+
+ docker_image = re.match("([0-9a-f]{64})\.tar", cr.keys()[0])
+ if docker_image:
+ return docker_image.group(1)
+ else:
+ return None
+
+def load_image_from_collection(api_client, docker_image_locator):
+ cr = arvados.CollectionReader(docker_image_locator, api_client=api_client)
+ docker_image = image_hash_in_collection(api_client, cr)
+ if docker_image:
+ for d in docker_images():
+ if d.hash == docker_image.group(1):
+ print "Docker image '%s' is already loaded" % docker_image.group(1)
+ return docker_image.group(1)
+
+ with cr.open(docker_image.group(0)) as img:
+ docker_load = subprocess.Popen(["docker", "load"], stdin=subprocess.PIPE)
+ data = img.read(64000)
+ n = len(data)
+ while data:
+ docker_load.stdin.write(data)
+ data = img.read(1024*1024)
+ n += len(data)
+ docker_load.stdin.close()
+ docker_load.wait()
+ if docker_load.returncode != 0:
+ raise arvados.errors.CommandFailedError("Failed to load image")
+
+ return docker_image.group(1)
+ else:
+ raise arvados.errors.ArgumentError("Failed to find Docker image in collection %s" % docker_image_locator)
+
def main(arguments=None):
args = arg_parser.parse_args(arguments)
api = arvados.api('v1')
if args.image is None or args.image == 'images':
- fmt = "{:30} {:10} {:12} {:29} {:20}"
+ if args.no_trunc:
+ fmt = "{:30} {:10} {:64} {:29} {:20}"
+ else:
+ fmt = "{:30} {:10} {:12} {:29} {:20}"
print fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED")
for i, j in list_images_in_arv(api, args.retries):
- print(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
+ print(fmt.format(j["repo"], j["tag"],
+ j["dockerhash"] if args.no_trunc else j["dockerhash"][0:12],
+ i, j["timestamp"].strftime("%c")))
sys.exit(0)
+ if args.download:
+ # search by name and tag
+ imgs_in_arv = list_images_in_arv(api, args.retries, image_name=args.image)
+ do_tag = True
+
+ if not imgs_in_arv:
+ # searh by image hash
+ imgs_in_arv = list_images_in_arv(api, args.retries, image_hash=args.image)
+ do_tag = False
+
+ if not imgs_in_arv and arvados.util.collection_uuid_pattern.match(args.image):
+ # search by collection uuid
+ imgs_in_arv = list_images_in_arv(api, args.retries, image_collection=args.image)
+ do_tag = True
+
+ if not imgs_in_arv and arvados.util.keep_locator_pattern.match(args.image):
+ # search by manifest portable data hash
+ imgs_in_arv = [[args.image]]
+ do_tag = False
+
+ if imgs_in_arv:
+ imghash = load_image_from_collection(api, imgs_in_arv[0][0])
+ if do_tag:
+ popen_docker(["tag", imghash, args.image], stdin=None, stdout=None).wait()
+ sys.exit(0)
+ else:
+ print >>sys.stderr, "arv-keepdocker: Docker image '%s' not found in Arvados" % args.image
+ sys.exit(1)
+
# Pull the image if requested, unless the image is specified as a hash
# that we already have.
if args.pull and not find_image_hashes(args.image):
@@ -254,24 +341,21 @@ def main(arguments=None):
num_retries=args.retries)['uuid']
# Find image hash tags
- existing_links = api.links().list(
- filters=[['link_class', '=', 'docker_image_hash'],
- ['name', '=', image_hash]]
- ).execute(num_retries=args.retries)['items']
+ existing_links = list_all(api.links().list, num_retries=args.retries,
+ filters=[['link_class', '=', 'docker_image_hash'],
+ ['name', '=', image_hash]])
if existing_links:
# get readable collections
- collections = api.collections().list(
- filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
- select=["uuid", "owner_uuid", "name", "manifest_text"]
- ).execute(num_retries=args.retries)['items']
+ collections = list_all(api.collections().list, num_retries=args.retries,
+ filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
+ select=["uuid", "owner_uuid", "name", "manifest_text"])
if collections:
# check for repo+tag links on these collections
- existing_repo_tag = (api.links().list(
+ existing_repo_tag = list_all(api.links().list, num_retries=args.retries,
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=', image_repo_tag],
- ['head_uuid', 'in', collections]]
- ).execute(num_retries=args.retries)['items']) if image_repo_tag else []
+ ['head_uuid', 'in', collections]]) if image_repo_tag else []
# Filter on elements owned by the parent project
owned_col = [c for c in collections if c['owner_uuid'] == parent_project_uuid]
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 6196b50..356cc48 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -635,7 +635,7 @@ class KeepClient(object):
def get_from_cache(self, loc):
"""Fetch a block only if is in the cache, otherwise return None."""
slot = self.block_cache.get(loc)
- if slot.ready.is_set():
+ if slot and slot.ready.is_set():
return slot.get()
else:
return None
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list