[ARVADOS] updated: 9d209cb34089febeaadeab572a1b4c8d9d485741

git at public.curoverse.com git at public.curoverse.com
Mon Jun 29 17:13:20 EDT 2015


Summary of changes:
 sdk/python/arvados/commands/cwl_runner.py | 48 +++++++++++++++++++------------
 sdk/python/arvados/commands/run.py        | 12 ++++----
 sdk/python/bin/cwl-runner                 |  0
 3 files changed, 35 insertions(+), 25 deletions(-)
 mode change 100644 => 100755 sdk/python/bin/cwl-runner

       via  9d209cb34089febeaadeab572a1b4c8d9d485741 (commit)
      from  c5e5db25e95289376a0df35695ab2b5f48131b1f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 9d209cb34089febeaadeab572a1b4c8d9d485741
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 29 17:13:46 2015 -0400

    6264: Uploads files and Docker images, can almost run jobs.

diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index 3799507..19fca27 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -11,6 +11,10 @@ import cwltool.main
 import threading
 import cwltool.docker
 import fnmatch
+import logging
+import re
+import os
+from cwltool.process import get_feature
 
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
@@ -29,7 +33,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
 
     if not images:
         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"])
+        args = [image_name]
+        if image_tag:
+            args.append(image_tag)
+        arvados.commands.keepdocker.main(args)
 
     return dockerRequirement["dockerImageId"]
 
@@ -81,14 +88,14 @@ class ArvadosJob(object):
         runtime_constraints = {}
 
         if self.stdin:
-            command["stdin"] = self.stdin
+            script_parameters["task.stdin"] = self.stdin
 
         if self.stdout:
-            command["stdout"] = self.stdout
+            script_parameters["task.stdout"] = self.stdout
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image)
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
             runtime_constraints["arvados_sdk_version"] = "master"
 
         response = self.arvrunner.api.jobs().create(body={
@@ -113,7 +120,7 @@ class ArvadosJob(object):
 
 
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir):
+    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
         self._pathmap = {}
         uploadfiles = []
 
@@ -122,16 +129,19 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
         for src in referenced_files:
             ab = src if os.path.isabs(src) else os.path.join(basedir, src)
             st = arvados.commands.run.statfile("", ab)
-            if isinstance(st, arvados.commands.run.UploadFile):
+            if kwargs.get("conformance_test"):
+                self._pathmap[src] = (src, ab)
+            elif isinstance(st, arvados.commands.run.UploadFile):
                 uploadfiles.append((src, ab, st))
             elif isinstance(st, arvados.commands.run.ArvFile):
                 self._pathmap[src] = (ab, st.fn)
             elif isinstance(st, basestring) and pdh_path.match(st):
-                self._pathmap[src] = (st, "$(file %s") % st)
+                self._pathmap[src] = (st, "$(file %s)" % st)
             else:
-                workflow.WorkflowException("Input file path '%s' is invalid", st)
+                raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid", st)
 
-        arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+        if uploadfiles:
+            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
 
         for src, ab, st in uploadfiles:
             self._pathmap[src] = (ab, st.fn)
@@ -145,8 +155,8 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def makeJobRunner(self):
         return ArvadosJob(self.arvrunner)
 
-    def makePathMapper(self, reffiles, input_basedir):
-        return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+    def makePathMapper(self, reffiles, input_basedir, **kwargs):
+        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
 
 
 class ArvCwlRunner(object):
@@ -174,12 +184,12 @@ class ArvCwlRunner(object):
         if "object_uuid" in event:
                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                    try:
-                        self.cond.acquire()
-                        self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                        try:
+                            self.cond.acquire()
+                            self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+                            self.cond.notify()
+                        finally:
+                            self.cond.release()
 
     def arvExecutor(self, t, job_order, input_basedir, **kwargs):
         events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
@@ -204,7 +214,7 @@ class ArvCwlRunner(object):
                         finally:
                             self.cond.release()
                     else:
-                        raise workflow.WorkflowException("Workflow deadlocked.")
+                        raise cwltool.workflow.WorkflowException("Workflow deadlocked.")
 
             while self.jobs:
                 try:
@@ -216,7 +226,7 @@ class ArvCwlRunner(object):
             events.close()
 
             if self.final_output is None:
-                raise workflow.WorkflowException("Workflow did not return a result.")
+                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
 
             return self.final_output
 
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 6b929ac..4cbda4a 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -101,10 +101,12 @@ def statfile(prefix, fn):
 
     return prefix+fn
 
-def uploadfiles(files, api):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None):
     # Find the smallest path prefix that includes all the files that need to be uploaded.
     # This starts at the root and iteratively removes common parent directory prefixes
     # until all file pathes no longer have a common parent.
+    n = True
+    pathprefix = "/"
     while n:
         pathstep = None
         for c in files:
@@ -133,12 +135,12 @@ def uploadfiles(files, api):
 
     logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
 
-    if args.dry_run:
+    if dry_run:
         logger.info("$(input) is %s", pathprefix.rstrip('/'))
         pdh = "$(input)"
     else:
         files = sorted(files, key=lambda x: x.fn)
-        collection = arvados.CollectionWriter(api, num_retries=args.retries)
+        collection = arvados.CollectionWriter(api, num_retries=num_retries)
         stream = None
         for f in files:
             sp = os.path.split(f.fn)
@@ -234,11 +236,9 @@ def main(arguments=None):
                             command[i] = statfile(m.group(1), m.group(2))
                             break
 
-    n = True
-    pathprefix = "/"
     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
     if files:
-        uploadfiles(files, api)
+        uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.num_retries, project=project)
 
     for i in xrange(1, len(slots)):
         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner
old mode 100644
new mode 100755

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list