[ARVADOS] updated: 2b08beada8fb933dc178fe5b9a52440af4dc4faa

git at public.curoverse.com git at public.curoverse.com
Thu Apr 2 23:06:16 EDT 2015


Summary of changes:
 sdk/cli/bin/crunch-job                 |   6 ++
 sdk/cli/bin/cwl-job.py                 | 124 -------------------------
 sdk/python/arvados/commands/cwl_job.py | 164 +++++++++++++++++++++++++++++++++
 sdk/python/bin/arv-cwl-job             |   6 ++
 sdk/python/tests/test_cwl_job.py       |  30 ++++++
 5 files changed, 206 insertions(+), 124 deletions(-)
 delete mode 100755 sdk/cli/bin/cwl-job.py
 create mode 100644 sdk/python/arvados/commands/cwl_job.py
 create mode 100755 sdk/python/bin/arv-cwl-job
 create mode 100644 sdk/python/tests/test_cwl_job.py

       via  2b08beada8fb933dc178fe5b9a52440af4dc4faa (commit)
      from  935416f81e95b9da8a2c999aeab5f2505dc0932d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2b08beada8fb933dc178fe5b9a52440af4dc4faa
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Apr 2 23:06:12 2015 -0400

    4685: Working on alternate arv-cwl-job replacement for classic crunch-job.

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 294696c..c3bf1f0 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -118,6 +118,7 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
+my @save_argv = @ARGV;
 my $force_unlock;
 my $git_dir;
 my $jobspec;
@@ -165,6 +166,11 @@ if ($jobspec =~ /^[-a-z\d]+$/)
 {
   # $jobspec is an Arvados UUID, not a JSON job specification
   $Job = api_call("jobs/get", uuid => $jobspec);
+
+  if ($Job->{'runtime_constraints'}->{'cwl_job'}) {
+    exec "/home/peter/work/arvados/sdk/python/bin/arv-cwl-job", @save_argv;
+  }
+
   if (!$force_unlock) {
     # Claim this job, and make sure nobody else does
     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
diff --git a/sdk/cli/bin/cwl-job.py b/sdk/cli/bin/cwl-job.py
deleted file mode 100755
index 5b5b476..0000000
--- a/sdk/cli/bin/cwl-job.py
+++ /dev/null
@@ -1,124 +0,0 @@
-#!/usr/bin/env python
-
-import arvados
-import argparse
-import subprocess
-import re
-import json
-import random
-from string import Template
-import pipes
-import sys
-from arvados.api import api_from_config
-import os
-import pprint
-
-EX_TEMPFAIL = 75
-
-def determine_resources():
-    have_slurm = (os.environ.get("SLURM_JOBID", False) and os.environ.get("SLURM_NODELIST", False)) != False
-
-    if have_slurm:
-        sinfo = subprocess.check_output(["sinfo", "-h", "--format=%c %N", "--nodes=" + os.environ["SLURM_NODELIST"]])
-    else:
-        with open("/proc/cpuinfo") as cpuinfo:
-            n = 0
-            for p in cpuinfo:
-                if p.startswith("processor"):
-                    n += 1
-        sinfo = "%i localhost" % n
-
-    nodes = {}
-    for l in sinfo.splitlines():
-        m = re.match("(\d+) (.*)", l)
-        cpus = int(m.group(1))
-        for n in m.group(2).split(","):
-            rn = re.match("([^[]+)\[(\d+)-(\d+)\]", n)
-            if rn:
-                for c in range(int(rn.group(2)), int(rn.group(3)+1)):
-                    nodes["%s%i" % (rn.group(1), c)] = {"slots": cpus}
-            else:
-                nodes[m.group(2)] = {"slots": cpus}
-
-    slots = {}
-    for n in nodes:
-        for c in range(0, nodes[n]["slots"]):
-            slots["%s[%i]" % (n, c)] = {"node": n, "slot": c, "task": None}
-
-    return {"have_slurm": have_slurm,
-            "nodes": nodes,
-            "slots": slots}
-
-def run_on_slot(have_slurm, slot, task):
-    tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
-
-    execution_script = Template("""
-if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
-    arv-get $docker_locator/$docker_locator.tar | docker load
-fi
-rm -rf $tmpdir
-mkdir -p $tmpdir/job_output
-exec stdbuf --output=0 --error=0 \
-  arv-mount --by-id $tmpdir/keep --exec \
-  crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000 \
-  docker --attach=stdout --attach=stderr -i --rm --cidfile=$tmpdir/cidfile --sig-proxy \
-  --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
-  --workdir=/tmp/job_output --user=$$UID $env $cmd
-""")
-    env = ""
-    for e in task["environment"]:
-        env += " --env=%s=%s" % (e, task["environment"][e])
-
-    ex = execution_script.substitute(docker_hash=task["docker_hash"],
-                                docker_locator=task["docker_locator"],
-                                tmpdir=tmpdir,
-                                env=env,
-                                cmd=" ".join([pipes.quote(c) for c in task["command"]]))
-
-    print ex
-
-    if have_slurm:
-        pass
-    else:
-        resources["slots"][slot]["task"] = subprocess.Popen(ex, shell=True)
-
-
-def main():
-
-    parser = argparse.ArgumentParser()
-
-    parser.add_argument("--job-api-token", type=str)
-    parser.add_argument("--job", type=str)
-    parser.add_argument("--git-dir", type=str)
-
-    args = parser.parse_args()
-
-    api = api_from_config("v1",
-                          {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
-                           "ARVADOS_API_TOKEN": args.job_api_token,
-                           "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE")})
-
-    if arvados.util.job_uuid_pattern.match(args.job):
-        try:
-            ex = api.jobs().lock(uuid=args.job).execute()
-            if "error" in ex:
-                sys.exit(EX_TEMPFAIL)
-        except:
-            sys.exit(EX_TEMPFAIL)
-
-        job = api.jobs().get(args.job)
-    else:
-        job = json.loads(args.job)
-
-    resources = determine_resources()
-
-    pprint.pprint(resources)
-
-    for t in job["script_parameters"]["tasks"]:
-        for s in resources["slots"]:
-            if resources["slots"][s]["task"] is None:
-                run_on_slot(resources, s, t)
-                break
-
-if __name__ == "__main__":
-    sys.exit(main())
diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
new file mode 100644
index 0000000..2e8a926
--- /dev/null
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -0,0 +1,164 @@
+import arvados
+import argparse
+import subprocess
+import re
+import json
+import random
+from string import Template
+import pipes
+import sys
+from arvados.api import api_from_config
+import os
+import pprint
+import arvados.events
+import threading
+
+EX_TEMPFAIL = 75
+
+def parse_sinfo(sinfo):
+    nodes = {}
+    for l in sinfo.splitlines():
+        m = re.match("(\d+) (.*)", l)
+        if m:
+            cpus = int(m.group(1))
+            for n in m.group(2).split(","):
+                rn = re.match("([^[]+)\[(\d+)-(\d+)\]", n)
+                if rn:
+                    for c in range(int(rn.group(2)), int(rn.group(3))+1):
+                        nodes["%s%i" % (rn.group(1), c)] = {"slots": cpus}
+                else:
+                    nodes[n] = {"slots": cpus}
+    return nodes
+
+def make_slots(nodes):
+    slots = {}
+    for n in nodes:
+        for c in range(0, nodes[n]["slots"]):
+            slots["%s[%i]" % (n, c)] = {"node": n, "slot": c, "task": None}
+    return slots
+
+def determine_resources(slurm_jobid=None, slurm_nodelist=None):
+    have_slurm = (slurm_jobid is not None) and (slurm_nodelist is not None)
+
+    if have_slurm:
+        sinfo = subprocess.check_output(["sinfo", "-h", "--format=%c %N", "--nodes=" + slurm_nodelist])
+    else:
+        with open("/proc/cpuinfo") as cpuinfo:
+            n = 0
+            for p in cpuinfo:
+                if p.startswith("processor"):
+                    n += 1
+        sinfo = "%i localhost" % n
+
+    nodes = parse_sinfo(sinfo)
+    slots = make_slots(nodes)
+
+    return {"have_slurm": have_slurm,
+            "nodes": nodes,
+            "slots": slots}
+
+def run_on_slot(resources, slot, task):
+    tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
+
+    execution_script = Template("""
+if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
+    arv-get $docker_locator/$docker_hash.tar | docker load
+fi
+rm -rf $tmpdir
+mkdir -p $tmpdir/job_output $tmpdir/keep
+if which crunchstat ; then
+  CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
+else
+  CRUNCHSTAT=""
+fi
+
+exec  \
+  arv-mount --by-id $tmpdir/keep --allow-other --exec \
+  $$CRUNCHSTAT \
+  docker run --attach=stdout --attach=stderr -i --rm --cidfile=$tmpdir/cidfile --sig-proxy \
+  --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
+  --workdir=/tmp/job_output --user=$$UID $env $docker_hash $cmd
+""")
+    env = ""
+    for e in task["environment"]:
+        env += " --env=%s=%s" % (e, task["environment"][e])
+
+    ex = execution_script.substitute(docker_hash=task["docker_hash"],
+                                docker_locator=task["docker_locator"],
+                                tmpdir=tmpdir,
+                                env=env,
+                                cmd=" ".join([pipes.quote(c) for c in task["command"]]))
+
+    print ex
+
+    if resources["have_slurm"]:
+        pass
+    else:
+        resources["slots"][slot]["task"] = subprocess.Popen(ex, shell=True)
+        resources["slots"][slot]["task"].wait()
+
+class TaskEvents(object):
+    def __init__(self, api_config, resources, job_uuid):
+        self.resources = resources
+        self.ws = arvados.events.subscribe(api_from_config("v1", api_config), [["object_uuid", "=", job_uuid]], self.on_event)
+        self.ws.subscribe([["object_uuid", "is_a", "arvados#jobTask"]])
+
+    def on_event(self, ev):
+        if ev.get('event_type') == "update" and ev['object_kind'] == "arvados#job":
+            if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                self.ws.close()
+        elif ev.get('object_kind') == "arvados#jobTask":
+            if ev.get('event_type') == "create":
+                pass
+            if ev.get('event_type') == "update":
+                pass
+
+def main(arguments=None):
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument("--job-api-token", type=str)
+    parser.add_argument("--job", type=str)
+    parser.add_argument("--git-dir", type=str)
+
+    print sys.argv
+    args = parser.parse_args(arguments)
+
+    api = None
+    if os.environ.get("ARVADOS_API_HOST"):
+        api_config = {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
+                      "ARVADOS_API_TOKEN": args.job_api_token,
+                      "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE")}
+        api = api_from_config("v1", api_config)
+
+    job_uuid = None
+    if arvados.util.job_uuid_pattern.match(args.job):
+        if not api:
+            print "Missing ARVADOS_API_HOST"
+            return 1
+        try:
+            job_uuid = args.job
+            ex = api.jobs().lock(uuid=args.job).execute()
+            if "error" in ex:
+                return EX_TEMPFAIL
+        except:
+            return EX_TEMPFAIL
+
+        job = api.jobs().get(uuid=args.job).execute()
+
+        if job["state"] in ("Complete", "Failed", "Cancelled"):
+            print "Job is already %s" % (job["state"])
+            return EX_TEMPFAIL
+    else:
+        job = json.loads(args.job)
+
+    resources = determine_resources(os.environ.get("SLURM_JOBID"),
+                                    os.environ.get("SLURM_NODELIST"))
+
+    if job_uuid:
+        ts = TaskEvents(api_config, resources, job_uuid)
+        ts.ws.run_forever()
+    else:
+        run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"])
+
+    return 0
diff --git a/sdk/python/bin/arv-cwl-job b/sdk/python/bin/arv-cwl-job
new file mode 100755
index 0000000..e96acf7
--- /dev/null
+++ b/sdk/python/bin/arv-cwl-job
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+
+import sys
+from arvados.commands.cwl_job import main
+
+sys.exit(main())
diff --git a/sdk/python/tests/test_cwl_job.py b/sdk/python/tests/test_cwl_job.py
new file mode 100644
index 0000000..2ab4791
--- /dev/null
+++ b/sdk/python/tests/test_cwl_job.py
@@ -0,0 +1,30 @@
+import arvados
+import unittest
+import arvados_testutil as tutil
+import arvados.commands.cwl_job as cwl_job
+
+class CwlJobTestCase(unittest.TestCase):
+    def test_parse_sinfo(self):
+        nodes = cwl_job.parse_sinfo("""
+16 compute0,compute2
+8 compute1,compute[3-5],compute7
+""")
+        self.assertEqual({"compute0": {"slots": 16},
+                          "compute2": {"slots": 16},
+                          "compute1": {"slots": 8},
+                          "compute3": {"slots": 8},
+                          "compute4": {"slots": 8},
+                          "compute5": {"slots": 8},
+                          "compute7": {"slots": 8}},
+        nodes)
+
+
+    def test_make_slots(self):
+        slots = cwl_job.make_slots({"compute0": {"slots": 2}, "compute1": {"slots": 4}})
+        self.assertEqual({"compute0[0]": {"node": "compute0", "slot": 0, "task": None},
+                          "compute0[1]": {"node": "compute0", "slot": 1, "task": None},
+                          "compute1[0]": {"node": "compute1", "slot": 0, "task": None},
+                          "compute1[1]": {"node": "compute1", "slot": 1, "task": None},
+                          "compute1[2]": {"node": "compute1", "slot": 2, "task": None},
+                          "compute1[3]": {"node": "compute1", "slot": 3, "task": None}},
+                          slots)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list