[ARVADOS] created: 42d4746b7e446811c98c518db03a986a3a499ae1

Git user git at public.curoverse.com
Fri Mar 11 13:31:59 EST 2016


        at  42d4746b7e446811c98c518db03a986a3a499ae1 (commit)


commit 42d4746b7e446811c98c518db03a986a3a499ae1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Mar 11 13:31:50 2016 -0500

    8673: Report pipeline instance uuid.  Reuse jobs by default.  Bump cwltool
    version dependency.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index cc0e0a6..9556cf4 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -383,7 +383,7 @@ class ArvCwlRunner(object):
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            logger.info("Pipeline instance %s", self.pipeline)
+            logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
             jobiter = tool.job(job_order,
                                input_basedir,
@@ -432,10 +432,10 @@ def main(args, stdout, stderr, api_client=None):
     parser = cwltool.main.arg_parser()
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
     exgroup.add_argument("--disable-reuse", action="store_false",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index cacfc21..aec4f22 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool>=1.0.20160308152645',
+          'cwltool>=1.0.20160311170456',
           'arvados-python-client>=0.1.20160219154918'
       ],
       zip_safe=True,

commit 9032247f9281283eeebb68e4cb5bf586236ba4bd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Mar 11 08:00:52 2016 -0500

    8673: Typo fix for --project-uuid.  Refactor main loop locking to address
    possible race condition causing workflow to end prematurely.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index cc826d4..cc0e0a6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -150,7 +150,7 @@ class ArvadosJob(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvruner.project_uuid)
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
 
         try:
             response = self.arvrunner.api.jobs().create(body={
@@ -319,24 +319,24 @@ class ArvCwlRunner(object):
 
     def on_message(self, event):
         if "object_uuid" in event:
-                if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                    if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
-                        uuid = event["object_uuid"]
-                        with self.lock:
-                            j = self.jobs[uuid]
-                            logger.info("Job %s (%s) is Running", j.name, uuid)
-                            j.running = True
-                            j.update_pipeline_component(event["properties"]["new_attributes"])
-                    elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                        uuid = event["object_uuid"]
-                        try:
-                            self.cond.acquire()
-                            j = self.jobs[uuid]
-                            logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                            j.done(event["properties"]["new_attributes"])
-                            self.cond.notify()
-                        finally:
-                            self.cond.release()
+            if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+                if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+                    uuid = event["object_uuid"]
+                    with self.lock:
+                        j = self.jobs[uuid]
+                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        j.running = True
+                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                    uuid = event["object_uuid"]
+                    try:
+                        self.cond.acquire()
+                        j = self.jobs[uuid]
+                        logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        j.done(event["properties"]["new_attributes"])
+                        self.cond.notify()
+                    finally:
+                        self.cond.release()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -383,6 +383,8 @@ class ArvCwlRunner(object):
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
+            logger.info("Pipeline instance %s", self.pipeline)
+
             jobiter = tool.job(job_order,
                                input_basedir,
                                self.output_callback,
@@ -390,42 +392,37 @@ class ArvCwlRunner(object):
                                **kwargs)
 
             try:
+                self.cond.acquire()
+                # Will continue to hold the lock for the duration of this code
+                # except when in cond.wait(), at which point on_message can update
+                # job state and process output callbacks.
+
                 for runnable in jobiter:
                     if runnable:
-                        with self.lock:
-                            runnable.run(**kwargs)
+                        runnable.run(**kwargs)
                     else:
                         if self.jobs:
-                            try:
-                                self.cond.acquire()
-                                self.cond.wait(1)
-                            except RuntimeError:
-                                pass
-                            finally:
-                                self.cond.release()
+                            self.cond.wait(1)
                         else:
-                            logger.error("Workflow cannot make any more progress.")
+                            logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                             break
 
                 while self.jobs:
-                    try:
-                        self.cond.acquire()
-                        self.cond.wait(1)
-                    except RuntimeError:
-                        pass
-                    finally:
-                        self.cond.release()
+                    self.cond.wait(1)
 
                 events.close()
 
                 if self.final_output is None:
                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
-
             except:
-                if sys.exc_info()[0] is not KeyboardInterrupt:
+                if sys.exc_info()[0] is KeyboardInterrupt:
+                    logger.error("Interrupted, marking pipeline as failed")
+                else:
                     logger.exception("Caught unhandled exception, marking pipeline as failed")
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            finally:
+                self.cond.release()
 
             return self.final_output
 

commit a59cf490d81c559b658dab9012a1f8d4327ca354
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Mar 10 11:57:57 2016 -0500

    8673: Adding --project-uuid so resources are created in the specified project.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7b1c291..cc826d4 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -34,7 +34,7 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
 
-def arv_docker_get_image(api_client, dockerRequirement, pull_image):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
 
@@ -48,7 +48,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
 
     if not images:
         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = [image_name]
+        args = ["--project-uuid", project_uuid, image_name]
         if image_tag:
             args.append(image_tag)
         logger.info("Uploading Docker image %s", ":".join(args))
@@ -150,10 +150,11 @@ class ArvadosJob(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvruner.project_uuid)
 
         try:
             response = self.arvrunner.api.jobs().create(body={
+                "owner_uuid": self.arvrunner.project_uuid,
                 "script": "crunchrunner",
                 "repository": "arvados",
                 "script_version": "master",
@@ -257,7 +258,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              arvrunner.api,
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s")
+                                             fnPattern="$(task.keep)/%s/%s",
+                                             project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
             arvrunner.add_uploaded(src, (ab, st.fn))
@@ -368,12 +370,18 @@ class ArvCwlRunner(object):
         kwargs["outdir"] = "$(task.outdir)"
         kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
-                                                                   "components": {},
-                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+            self.pipeline = self.api.pipeline_instances().create(
+                body={
+                    "owner_uuid": self.project_uuid,
+                    "name": shortname(tool.tool["id"]),
+                    "components": {},
+                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
             jobiter = tool.job(job_order,
                                input_basedir,
@@ -432,6 +440,7 @@ def main(args, stdout, stderr, api_client=None):
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=False, dest="enable_reuse",
                         help="")
+    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 
     try:
         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list