[ARVADOS] updated: 275637f3ce7fcaacd8f41cd038b15332785298ef

Git user git at public.curoverse.com
Wed Sep 14 17:14:15 EDT 2016


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 11 ++++--
 sdk/cwl/arvados_cwl/arvjob.py   | 86 ++++++++++++++++++++++-------------------
 sdk/cwl/arvados_cwl/perf.py     | 15 +++++++
 3 files changed, 69 insertions(+), 43 deletions(-)
 create mode 100644 sdk/cwl/arvados_cwl/perf.py

       via  275637f3ce7fcaacd8f41cd038b15332785298ef (commit)
       via  920a9b20de934454768a43ce6d8a8d8ff4ca0840 (commit)
      from  cae94f22b760c6c6899fc4d23db15d389535ff0a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 275637f3ce7fcaacd8f41cd038b15332785298ef
Merge: cae94f2 920a9b2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Sep 14 17:14:07 2016 -0400

    Merge branch '100032-cwl-metrics' refs #100032


commit 920a9b20de934454768a43ce6d8a8d8ff4ca0840
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Sep 14 17:13:35 2016 -0400

    100032: Add metrics to job submission in --debug mode.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 309e941..7bfdba8 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -24,6 +24,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
 from .arvworkflow import make_workflow
+from .perf import Perf
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
@@ -97,7 +98,8 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
@@ -162,6 +164,9 @@ class ArvCwlRunner(object):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
+
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
@@ -177,7 +182,6 @@ class ArvCwlRunner(object):
         if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
             return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
 
-        self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
@@ -244,7 +248,8 @@ class ArvCwlRunner(object):
 
             for runnable in jobiter:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(logger, "run"):
+                        runnable.run(**kwargs)
                 else:
                     if self.processes:
                         self.cond.wait(1)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index a688469..4ba19a6 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -13,6 +13,7 @@ import arvados.collection
 from .arvdocker import arv_docker_get_image
 from .runner import Runner
 from .pathmapper import InitialWorkDirPathMapper
+from .perf import Perf
 from . import done
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -90,19 +91,20 @@ class ArvadosJob(object):
             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
 
         try:
-            response = self.arvrunner.api.jobs().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "script": "crunchrunner",
-                    "repository": "arvados",
-                    "script_version": "master",
-                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                    "script_parameters": {"tasks": [script_parameters]},
-                    "runtime_constraints": runtime_constraints
-                },
-                filters=filters,
-                find_or_create=kwargs.get("enable_reuse", True)
-            ).execute(num_retries=self.arvrunner.num_retries)
+            with Perf(logger, "create %s" % self.name):
+                response = self.arvrunner.api.jobs().create(
+                    body={
+                        "owner_uuid": self.arvrunner.project_uuid,
+                        "script": "crunchrunner",
+                        "repository": "arvados",
+                        "script_version": "master",
+                        "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                        "script_parameters": {"tasks": [script_parameters]},
+                        "runtime_constraints": runtime_constraints
+                    },
+                    filters=filters,
+                    find_or_create=kwargs.get("enable_reuse", True)
+                ).execute(num_retries=self.arvrunner.num_retries)
 
             self.arvrunner.processes[response["uuid"]] = self
 
@@ -111,7 +113,8 @@ class ArvadosJob(object):
             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
 
             if response["state"] in ("Complete", "Failed", "Cancelled"):
-                self.done(response)
+                with Perf(logger, "done %s" % self.name):
+                    self.done(response)
         except Exception as e:
             logger.error("Got error %s" % str(e))
             self.output_callback({}, "permanentFail")
@@ -119,7 +122,8 @@ class ArvadosJob(object):
     def update_pipeline_component(self, record):
         if self.arvrunner.pipeline:
             self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+            with Perf(logger, "update_pipeline_component %s" % self.name):
+                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
                                                                                  body={
                                                                                     "components": self.arvrunner.pipeline["components"]
                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
@@ -151,31 +155,33 @@ class ArvadosJob(object):
             outputs = {}
             try:
                 if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-
-                    outputs = done.done(self, record, tmpdir, outdir, keepdir)
+                    with Perf(logger, "inspect log %s" % self.name):
+                        logc = arvados.collection.Collection(record["log"])
+                        log = logc.open(logc.keys()[0])
+                        tmpdir = None
+                        outdir = None
+                        keepdir = None
+                        for l in log:
+                            # Determine the tmpdir, outdir and keepdir paths from
+                            # the job run.  Unfortunately, we can't take the first
+                            # values we find (which are expected to be near the
+                            # top) and stop scanning because if the node fails and
+                            # the job restarts on a different node these values
+                            # will different runs, and we need to know about the
+                            # final run that actually produced output.
+
+                            g = tmpdirre.match(l)
+                            if g:
+                                tmpdir = g.group(1)
+                            g = outdirre.match(l)
+                            if g:
+                                outdir = g.group(1)
+                            g = keepre.match(l)
+                            if g:
+                                keepdir = g.group(1)
+
+                    with Perf(logger, "output collection %s" % self.name):
+                        outputs = done.done(self, record, tmpdir, outdir, keepdir)
             except WorkflowException as e:
                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
diff --git a/sdk/cwl/arvados_cwl/perf.py b/sdk/cwl/arvados_cwl/perf.py
new file mode 100644
index 0000000..64fda59
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/perf.py
@@ -0,0 +1,15 @@
+import time
+import uuid
+
+class Perf(object):
+    def __init__(self, logger, name):
+        self.logger = logger
+        self.name = name
+
+    def __enter__(self):
+        self.time = time.time()
+        self.logger.debug("ENTER %s %s", self.name, self.time)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        now = time.time()
+        self.logger.debug("EXIT %s %s %s", self.name, now, now - self.time)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list