[ARVADOS] updated: 9d209cb34089febeaadeab572a1b4c8d9d485741
git at public.curoverse.com
git at public.curoverse.com
Mon Jun 29 17:13:20 EDT 2015
Summary of changes:
sdk/python/arvados/commands/cwl_runner.py | 48 +++++++++++++++++++------------
sdk/python/arvados/commands/run.py | 12 ++++----
sdk/python/bin/cwl-runner | 0
3 files changed, 35 insertions(+), 25 deletions(-)
mode change 100644 => 100755 sdk/python/bin/cwl-runner
via 9d209cb34089febeaadeab572a1b4c8d9d485741 (commit)
from c5e5db25e95289376a0df35695ab2b5f48131b1f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 9d209cb34089febeaadeab572a1b4c8d9d485741
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 29 17:13:46 2015 -0400
6264: Uploads files and Docker images, can almost run jobs.
diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index 3799507..19fca27 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -11,6 +11,10 @@ import cwltool.main
import threading
import cwltool.docker
import fnmatch
+import logging
+import re
+import os
+from cwltool.process import get_feature
logger = logging.getLogger('arvados.cwl-runner')
logger.setLevel(logging.INFO)
@@ -29,7 +33,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
if not images:
imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"])
+ args = [image_name]
+ if image_tag:
+ args.append(image_tag)
+ arvados.commands.keepdocker.main(args)
return dockerRequirement["dockerImageId"]
@@ -81,14 +88,14 @@ class ArvadosJob(object):
runtime_constraints = {}
if self.stdin:
- command["stdin"] = self.stdin
+ script_parameters["task.stdin"] = self.stdin
if self.stdout:
- command["stdout"] = self.stdout
+ script_parameters["task.stdout"] = self.stdout
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
runtime_constraints["arvados_sdk_version"] = "master"
response = self.arvrunner.api.jobs().create(body={
@@ -113,7 +120,7 @@ class ArvadosJob(object):
class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, arvrunner, referenced_files, basedir):
+ def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
self._pathmap = {}
uploadfiles = []
@@ -122,16 +129,19 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
for src in referenced_files:
ab = src if os.path.isabs(src) else os.path.join(basedir, src)
st = arvados.commands.run.statfile("", ab)
- if isinstance(st, arvados.commands.run.UploadFile):
+ if kwargs.get("conformance_test"):
+ self._pathmap[src] = (src, ab)
+ elif isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.append((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
self._pathmap[src] = (ab, st.fn)
elif isinstance(st, basestring) and pdh_path.match(st):
- self._pathmap[src] = (st, "$(file %s") % st)
+ self._pathmap[src] = (st, "$(file %s)" % st)
else:
- workflow.WorkflowException("Input file path '%s' is invalid", st)
+ raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid", st)
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+ if uploadfiles:
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
for src, ab, st in uploadfiles:
self._pathmap[src] = (ab, st.fn)
@@ -145,8 +155,8 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
def makeJobRunner(self):
return ArvadosJob(self.arvrunner)
- def makePathMapper(self, reffiles, input_basedir):
- return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+ def makePathMapper(self, reffiles, input_basedir, **kwargs):
+ return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
class ArvCwlRunner(object):
@@ -174,12 +184,12 @@ class ArvCwlRunner(object):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- try:
- self.cond.acquire()
- self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ try:
+ self.cond.acquire()
+ self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+ self.cond.notify()
+ finally:
+ self.cond.release()
def arvExecutor(self, t, job_order, input_basedir, **kwargs):
events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
@@ -204,7 +214,7 @@ class ArvCwlRunner(object):
finally:
self.cond.release()
else:
- raise workflow.WorkflowException("Workflow deadlocked.")
+ raise cwltool.workflow.WorkflowException("Workflow deadlocked.")
while self.jobs:
try:
@@ -216,7 +226,7 @@ class ArvCwlRunner(object):
events.close()
if self.final_output is None:
- raise workflow.WorkflowException("Workflow did not return a result.")
+ raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
return self.final_output
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 6b929ac..4cbda4a 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -101,10 +101,12 @@ def statfile(prefix, fn):
return prefix+fn
-def uploadfiles(files, api):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file pathes no longer have a common parent.
+ n = True
+ pathprefix = "/"
while n:
pathstep = None
for c in files:
@@ -133,12 +135,12 @@ def uploadfiles(files, api):
logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
- if args.dry_run:
+ if dry_run:
logger.info("$(input) is %s", pathprefix.rstrip('/'))
pdh = "$(input)"
else:
files = sorted(files, key=lambda x: x.fn)
- collection = arvados.CollectionWriter(api, num_retries=args.retries)
+ collection = arvados.CollectionWriter(api, num_retries=num_retries)
stream = None
for f in files:
sp = os.path.split(f.fn)
@@ -234,11 +236,9 @@ def main(arguments=None):
command[i] = statfile(m.group(1), m.group(2))
break
- n = True
- pathprefix = "/"
files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
if files:
- uploadfiles(files, api)
+ uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.num_retries, project=project)
for i in xrange(1, len(slots)):
slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner
old mode 100644
new mode 100755
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list