[ARVADOS] updated: 275637f3ce7fcaacd8f41cd038b15332785298ef
Git user
git at public.curoverse.com
Wed Sep 14 17:14:15 EDT 2016
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 11 ++++--
sdk/cwl/arvados_cwl/arvjob.py | 86 ++++++++++++++++++++++-------------------
sdk/cwl/arvados_cwl/perf.py | 15 +++++++
3 files changed, 69 insertions(+), 43 deletions(-)
create mode 100644 sdk/cwl/arvados_cwl/perf.py
via 275637f3ce7fcaacd8f41cd038b15332785298ef (commit)
via 920a9b20de934454768a43ce6d8a8d8ff4ca0840 (commit)
from cae94f22b760c6c6899fc4d23db15d389535ff0a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 275637f3ce7fcaacd8f41cd038b15332785298ef
Merge: cae94f2 920a9b2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Sep 14 17:14:07 2016 -0400
Merge branch '100032-cwl-metrics' refs #100032
commit 920a9b20de934454768a43ce6d8a8d8ff4ca0840
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Sep 14 17:13:35 2016 -0400
100032: Add metrics to job submission in --debug mode.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 309e941..7bfdba8 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -24,6 +24,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from .arvtool import ArvadosCommandTool
from .fsaccess import CollectionFsAccess
from .arvworkflow import make_workflow
+from .perf import Perf
from cwltool.process import shortname, UnsupportedRequirement
from cwltool.pathmapper import adjustFileObjs
@@ -97,7 +98,8 @@ class ArvCwlRunner(object):
self.cond.acquire()
j = self.processes[uuid]
logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
- j.done(event["properties"]["new_attributes"])
+ with Perf(logger, "done %s" % j.name):
+ j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
@@ -162,6 +164,9 @@ class ArvCwlRunner(object):
logger.setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+ if self.debug:
+ logger.setLevel(logging.DEBUG)
+
useruuid = self.api.users().current().execute()["uuid"]
self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
self.pipeline = None
@@ -177,7 +182,6 @@ class ArvCwlRunner(object):
if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
- self.debug = kwargs.get("debug")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
kwargs["make_fs_access"] = make_fs_access
@@ -244,7 +248,8 @@ class ArvCwlRunner(object):
for runnable in jobiter:
if runnable:
- runnable.run(**kwargs)
+ with Perf(logger, "run"):
+ runnable.run(**kwargs)
else:
if self.processes:
self.cond.wait(1)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index a688469..4ba19a6 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -13,6 +13,7 @@ import arvados.collection
from .arvdocker import arv_docker_get_image
from .runner import Runner
from .pathmapper import InitialWorkDirPathMapper
+from .perf import Perf
from . import done
logger = logging.getLogger('arvados.cwl-runner')
@@ -90,19 +91,20 @@ class ArvadosJob(object):
filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
try:
- response = self.arvrunner.api.jobs().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
- "script_parameters": {"tasks": [script_parameters]},
- "runtime_constraints": runtime_constraints
- },
- filters=filters,
- find_or_create=kwargs.get("enable_reuse", True)
- ).execute(num_retries=self.arvrunner.num_retries)
+ with Perf(logger, "create %s" % self.name):
+ response = self.arvrunner.api.jobs().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "script": "crunchrunner",
+ "repository": "arvados",
+ "script_version": "master",
+ "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+ "script_parameters": {"tasks": [script_parameters]},
+ "runtime_constraints": runtime_constraints
+ },
+ filters=filters,
+ find_or_create=kwargs.get("enable_reuse", True)
+ ).execute(num_retries=self.arvrunner.num_retries)
self.arvrunner.processes[response["uuid"]] = self
@@ -111,7 +113,8 @@ class ArvadosJob(object):
logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
+ with Perf(logger, "done %s" % self.name):
+ self.done(response)
except Exception as e:
logger.error("Got error %s" % str(e))
self.output_callback({}, "permanentFail")
@@ -119,7 +122,8 @@ class ArvadosJob(object):
def update_pipeline_component(self, record):
if self.arvrunner.pipeline:
self.arvrunner.pipeline["components"][self.name] = {"job": record}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ with Perf(logger, "update_pipeline_component %s" % self.name):
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
body={
"components": self.arvrunner.pipeline["components"]
}).execute(num_retries=self.arvrunner.num_retries)
@@ -151,31 +155,33 @@ class ArvadosJob(object):
outputs = {}
try:
if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- outputs = done.done(self, record, tmpdir, outdir, keepdir)
+ with Perf(logger, "inspect log %s" % self.name):
+ logc = arvados.collection.Collection(record["log"])
+ log = logc.open(logc.keys()[0])
+ tmpdir = None
+ outdir = None
+ keepdir = None
+ for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
+ g = tmpdirre.match(l)
+ if g:
+ tmpdir = g.group(1)
+ g = outdirre.match(l)
+ if g:
+ outdir = g.group(1)
+ g = keepre.match(l)
+ if g:
+ keepdir = g.group(1)
+
+ with Perf(logger, "output collection %s" % self.name):
+ outputs = done.done(self, record, tmpdir, outdir, keepdir)
except WorkflowException as e:
logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
diff --git a/sdk/cwl/arvados_cwl/perf.py b/sdk/cwl/arvados_cwl/perf.py
new file mode 100644
index 0000000..64fda59
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/perf.py
@@ -0,0 +1,15 @@
+import time
+import uuid
+
+class Perf(object):
+ def __init__(self, logger, name):
+ self.logger = logger
+ self.name = name
+
+ def __enter__(self):
+ self.time = time.time()
+ self.logger.debug("ENTER %s %s", self.name, self.time)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ now = time.time()
+ self.logger.debug("EXIT %s %s %s", self.name, now, now - self.time)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list