[ARVADOS] created: 42d4746b7e446811c98c518db03a986a3a499ae1
Git user
git at public.curoverse.com
Fri Mar 11 13:31:59 EST 2016
at 42d4746b7e446811c98c518db03a986a3a499ae1 (commit)
commit 42d4746b7e446811c98c518db03a986a3a499ae1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Mar 11 13:31:50 2016 -0500
8673: Report pipeline instance uuid. Reuse jobs by default. Bump cwltool
version dependency.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index cc0e0a6..9556cf4 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -383,7 +383,7 @@ class ArvCwlRunner(object):
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline)
+ logger.info("Pipeline instance %s", self.pipeline["uuid"])
jobiter = tool.job(job_order,
input_basedir,
@@ -432,10 +432,10 @@ def main(args, stdout, stderr, api_client=None):
parser = cwltool.main.arg_parser()
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
- default=False, dest="enable_reuse",
+ default=True, dest="enable_reuse",
help="")
exgroup.add_argument("--disable-reuse", action="store_false",
- default=False, dest="enable_reuse",
+ default=True, dest="enable_reuse",
help="")
parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index cacfc21..aec4f22 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20160308152645',
+ 'cwltool>=1.0.20160311170456',
'arvados-python-client>=0.1.20160219154918'
],
zip_safe=True,
commit 9032247f9281283eeebb68e4cb5bf586236ba4bd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Mar 11 08:00:52 2016 -0500
8673: Typo fix for --project-uuid. Refactor main loop locking to address
possible race condition causing workflow to end prematurely.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index cc826d4..cc0e0a6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -150,7 +150,7 @@ class ArvadosJob(object):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvruner.project_uuid)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
try:
response = self.arvrunner.api.jobs().create(body={
@@ -319,24 +319,24 @@ class ArvCwlRunner(object):
def on_message(self, event):
if "object_uuid" in event:
- if event["object_uuid"] in self.jobs and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
- uuid = event["object_uuid"]
- with self.lock:
- j = self.jobs[uuid]
- logger.info("Job %s (%s) is Running", j.name, uuid)
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- uuid = event["object_uuid"]
- try:
- self.cond.acquire()
- j = self.jobs[uuid]
- logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
- j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+ if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+ uuid = event["object_uuid"]
+ with self.lock:
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is Running", j.name, uuid)
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ uuid = event["object_uuid"]
+ try:
+ self.cond.acquire()
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ j.done(event["properties"]["new_attributes"])
+ self.cond.notify()
+ finally:
+ self.cond.release()
def get_uploaded(self):
return self.uploaded.copy()
@@ -383,6 +383,8 @@ class ArvCwlRunner(object):
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+ logger.info("Pipeline instance %s", self.pipeline)
+
jobiter = tool.job(job_order,
input_basedir,
self.output_callback,
@@ -390,42 +392,37 @@ class ArvCwlRunner(object):
**kwargs)
try:
+ self.cond.acquire()
+ # Will continue to hold the lock for the duration of this code
+ # except when in cond.wait(), at which point on_message can update
+ # job state and process output callbacks.
+
for runnable in jobiter:
if runnable:
- with self.lock:
- runnable.run(**kwargs)
+ runnable.run(**kwargs)
else:
if self.jobs:
- try:
- self.cond.acquire()
- self.cond.wait(1)
- except RuntimeError:
- pass
- finally:
- self.cond.release()
+ self.cond.wait(1)
else:
- logger.error("Workflow cannot make any more progress.")
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
break
while self.jobs:
- try:
- self.cond.acquire()
- self.cond.wait(1)
- except RuntimeError:
- pass
- finally:
- self.cond.release()
+ self.cond.wait(1)
events.close()
if self.final_output is None:
raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
-
except:
- if sys.exc_info()[0] is not KeyboardInterrupt:
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ logger.error("Interrupted, marking pipeline as failed")
+ else:
logger.exception("Caught unhandled exception, marking pipeline as failed")
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ finally:
+ self.cond.release()
return self.final_output
commit a59cf490d81c559b658dab9012a1f8d4327ca354
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Mar 10 11:57:57 2016 -0500
8673: Adding --project-uuid so resources are created in the specified project.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7b1c291..cc826d4 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -34,7 +34,7 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-def arv_docker_get_image(api_client, dockerRequirement, pull_image):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -48,7 +48,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
if not images:
imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- args = [image_name]
+ args = ["--project-uuid", project_uuid, image_name]
if image_tag:
args.append(image_tag)
logger.info("Uploading Docker image %s", ":".join(args))
@@ -150,10 +150,11 @@ class ArvadosJob(object):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvruner.project_uuid)
try:
response = self.arvrunner.api.jobs().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
"script": "crunchrunner",
"repository": "arvados",
"script_version": "master",
@@ -257,7 +258,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
arvrunner.api,
dry_run=kwargs.get("dry_run"),
num_retries=3,
- fnPattern="$(task.keep)/%s/%s")
+ fnPattern="$(task.keep)/%s/%s",
+ project=arvrunner.project_uuid)
for src, ab, st in uploadfiles:
arvrunner.add_uploaded(src, (ab, st.fn))
@@ -368,12 +370,18 @@ class ArvCwlRunner(object):
kwargs["outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ useruuid = self.api.users().current().execute()["uuid"]
+ self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
else:
- self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+ self.pipeline = self.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.project_uuid,
+ "name": shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
jobiter = tool.job(job_order,
input_basedir,
@@ -432,6 +440,7 @@ def main(args, stdout, stderr, api_client=None):
exgroup.add_argument("--disable-reuse", action="store_false",
default=False, dest="enable_reuse",
help="")
+ parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
try:
runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list