[ARVADOS] updated: 53755b6cb7a0ba9a73a7f1a54c219140e82863d8

git at public.curoverse.com git at public.curoverse.com
Thu Jul 2 09:28:13 EDT 2015


Summary of changes:
 crunch_scripts/run-command                |  9 +++-
 sdk/python/arvados/commands/cwl_runner.py | 77 ++++++++++++++++++++++---------
 2 files changed, 62 insertions(+), 24 deletions(-)

       via  53755b6cb7a0ba9a73a7f1a54c219140e82863d8 (commit)
       via  508d3b100a3faa9711e42a32ddeaade5da2470da (commit)
       via  e4895820d9302048cc41e4b119450bbb8c01f70f (commit)
       via  25e66d0972d323e6bd865f9705cb3f075b727290 (commit)
      from  d0a2405dd75f02ee40714d112e8ba162ac01ad40 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 53755b6cb7a0ba9a73a7f1a54c219140e82863d8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jul 1 23:05:38 2015 -0400

    6264: Support EnvVarRequirement and CreateFileRequirement.

diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index dd40fbf..65f4940 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -19,9 +19,6 @@ from cwltool.process import get_feature
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-del cwltool.draft2tool.supportedProcessRequirements[cwltool.draft2tool.supportedProcessRequirements.index("EnvVarRequirement")]
-del cwltool.draft2tool.supportedProcessRequirements[cwltool.draft2tool.supportedProcessRequirements.index("CreateFileRequirement")]
-
 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -102,7 +99,7 @@ class ArvadosJob(object):
             vwd = arvados.collection.Collection()
             for t in self.generatefiles:
                 if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"])
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][7:-1])
                     vwd.copy(rest, t, source_collection=src)
                 else:
                     with vwd.open(t, "w") as f:
@@ -167,18 +164,19 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
 
         for src in referenced_files:
-            ab = src if os.path.isabs(src) else os.path.join(basedir, src)
-            st = arvados.commands.run.statfile("", ab)
-            if kwargs.get("conformance_test"):
-                self._pathmap[src] = (src, ab)
-            elif isinstance(st, arvados.commands.run.UploadFile):
-                uploadfiles.append((src, ab, st))
-            elif isinstance(st, arvados.commands.run.ArvFile):
-                self._pathmap[src] = (ab, st.fn)
-            elif isinstance(st, basestring) and pdh_path.match(st):
-                self._pathmap[src] = (st, "$(file %s)" % st)
+            if isinstance(src, basestring) and pdh_path.match(src):
+                self._pathmap[src] = (src, "$(file %s)" % src)
             else:
-                raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid", st)
+                ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+                st = arvados.commands.run.statfile("", ab)
+                if kwargs.get("conformance_test"):
+                    self._pathmap[src] = (src, ab)
+                elif isinstance(st, arvados.commands.run.UploadFile):
+                    uploadfiles.append((src, ab, st))
+                elif isinstance(st, arvados.commands.run.ArvFile):
+                    self._pathmap[src] = (ab, st.fn)
+                else:
+                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
 
         if uploadfiles:
             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
@@ -265,7 +263,8 @@ class ArvCwlRunner(object):
                         finally:
                             self.cond.release()
                     else:
-                        raise cwltool.workflow.WorkflowException("Workflow deadlocked.")
+                        logger.error("Workflow cannot make any more progress.")
+                        break
 
             while self.jobs:
                 try:

commit 508d3b100a3faa9711e42a32ddeaade5da2470da
Merge: 25e66d0 e489582
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jul 1 17:43:58 2015 -0400

    Merge branch '6264-run-command-task-env' into 6264-cwl-runner


commit 25e66d0972d323e6bd865f9705cb3f075b727290
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jul 1 17:41:55 2015 -0400

    Support job reuse, CreateFileRequirement, and EnvVarRequirement

diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index a45a079..dd40fbf 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -98,6 +98,23 @@ class ArvadosJob(object):
         }
         runtime_constraints = {}
 
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"])
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            script_parameters["task.vwd"] = vwd.portable_data_hash()
+
+        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
+        if self.environment:
+            for k,v in self.environment.items():
+                script_parameters["task.env"][k] = v
+
         if self.stdin:
             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
 
@@ -115,12 +132,15 @@ class ArvadosJob(object):
             "script_version": "master",
             "script_parameters": script_parameters,
             "runtime_constraints": runtime_constraints
-        }).execute()
-
-        logger.info("Submitted job %s", response["uuid"])
+        }, find_or_create=kwargs.get("enable_reuse", True)).execute()
 
         self.arvrunner.jobs[response["uuid"]] = self
 
+        logger.info("Job %s is %s", response["uuid"], response["state"])
+
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
+
     def done(self, record):
         try:
             if record["state"] == "Complete":
@@ -206,7 +226,8 @@ class ArvCwlRunner(object):
                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
                         logger.info("Job %s is Running", event["object_uuid"])
-                        self.jobs[event["object_uuid"]].running = True
+                        with self.lock:
+                            self.jobs[event["object_uuid"]].running = True
                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
                         logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
                         try:
@@ -216,13 +237,16 @@ class ArvCwlRunner(object):
                         finally:
                             self.cond.release()
 
-    def arvExecutor(self, t, job_order, input_basedir, **kwargs):
+    def arvExecutor(self, t, job_order, input_basedir, args, **kwargs):
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
-        kwargs["fs_access"] = CollectionFsAccess(input_basedir)
+        self.fs_access = CollectionFsAccess(input_basedir)
+
+        kwargs["fs_access"] = self.fs_access
+        kwargs["enable_reuse"] = args.enable_reuse
 
         if kwargs.get("conformance_test"):
-            return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)
+            return cwltool.main.single_job_executor(t, job_order, input_basedir, args, **kwargs)
         else:
             jobiter = t.job(job_order,
                             input_basedir,
@@ -261,4 +285,12 @@ class ArvCwlRunner(object):
 def main(args, stdout, stderr, api_client=None):
     runner = ArvCwlRunner(api_client=arvados.api('v1'))
     args.append("--leave-outputs")
-    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool)
+    parser = cwltool.main.arg_parser()
+    parser.add_argument("--enable-reuse", action="store_true",
+                        default=False, dest="enable_reuse",
+                        help="")
+    parser.add_argument("--disable-reuse", action="store_false",
+                        default=False, dest="enable_reuse",
+                        help="")
+
+    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list