[ARVADOS] updated: 2b08beada8fb933dc178fe5b9a52440af4dc4faa
git at public.curoverse.com
git at public.curoverse.com
Thu Apr 2 23:06:16 EDT 2015
Summary of changes:
sdk/cli/bin/crunch-job | 6 ++
sdk/cli/bin/cwl-job.py | 124 -------------------------
sdk/python/arvados/commands/cwl_job.py | 164 +++++++++++++++++++++++++++++++++
sdk/python/bin/arv-cwl-job | 6 ++
sdk/python/tests/test_cwl_job.py | 30 ++++++
5 files changed, 206 insertions(+), 124 deletions(-)
delete mode 100755 sdk/cli/bin/cwl-job.py
create mode 100644 sdk/python/arvados/commands/cwl_job.py
create mode 100755 sdk/python/bin/arv-cwl-job
create mode 100644 sdk/python/tests/test_cwl_job.py
via 2b08beada8fb933dc178fe5b9a52440af4dc4faa (commit)
from 935416f81e95b9da8a2c999aeab5f2505dc0932d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2b08beada8fb933dc178fe5b9a52440af4dc4faa
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Apr 2 23:06:12 2015 -0400
4685: Working on alternate arv-cwl-job replacement for classic crunch-job.
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 294696c..c3bf1f0 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -118,6 +118,7 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
mkdir ($ENV{"JOB_WORK"});
+my @save_argv = @ARGV;
my $force_unlock;
my $git_dir;
my $jobspec;
@@ -165,6 +166,11 @@ if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
$Job = api_call("jobs/get", uuid => $jobspec);
+
+ if ($Job->{'runtime_constraints'}->{'cwl_job'}) {
+ exec "/home/peter/work/arvados/sdk/python/bin/arv-cwl-job", @save_argv;
+ }
+
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
diff --git a/sdk/cli/bin/cwl-job.py b/sdk/cli/bin/cwl-job.py
deleted file mode 100755
index 5b5b476..0000000
--- a/sdk/cli/bin/cwl-job.py
+++ /dev/null
@@ -1,124 +0,0 @@
-#!/usr/bin/env python
-
-import arvados
-import argparse
-import subprocess
-import re
-import json
-import random
-from string import Template
-import pipes
-import sys
-from arvados.api import api_from_config
-import os
-import pprint
-
-EX_TEMPFAIL = 75
-
-def determine_resources():
- have_slurm = (os.environ.get("SLURM_JOBID", False) and os.environ.get("SLURM_NODELIST", False)) != False
-
- if have_slurm:
- sinfo = subprocess.check_output(["sinfo", "-h", "--format=%c %N", "--nodes=" + os.environ["SLURM_NODELIST"]])
- else:
- with open("/proc/cpuinfo") as cpuinfo:
- n = 0
- for p in cpuinfo:
- if p.startswith("processor"):
- n += 1
- sinfo = "%i localhost" % n
-
- nodes = {}
- for l in sinfo.splitlines():
- m = re.match("(\d+) (.*)", l)
- cpus = int(m.group(1))
- for n in m.group(2).split(","):
- rn = re.match("([^[]+)\[(\d+)-(\d+)\]", n)
- if rn:
- for c in range(int(rn.group(2)), int(rn.group(3)+1)):
- nodes["%s%i" % (rn.group(1), c)] = {"slots": cpus}
- else:
- nodes[m.group(2)] = {"slots": cpus}
-
- slots = {}
- for n in nodes:
- for c in range(0, nodes[n]["slots"]):
- slots["%s[%i]" % (n, c)] = {"node": n, "slot": c, "task": None}
-
- return {"have_slurm": have_slurm,
- "nodes": nodes,
- "slots": slots}
-
-def run_on_slot(have_slurm, slot, task):
- tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
-
- execution_script = Template("""
-if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
- arv-get $docker_locator/$docker_locator.tar | docker load
-fi
-rm -rf $tmpdir
-mkdir -p $tmpdir/job_output
-exec stdbuf --output=0 --error=0 \
- arv-mount --by-id $tmpdir/keep --exec \
- crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000 \
- docker --attach=stdout --attach=stderr -i --rm --cidfile=$tmpdir/cidfile --sig-proxy \
- --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
- --workdir=/tmp/job_output --user=$$UID $env $cmd
-""")
- env = ""
- for e in task["environment"]:
- env += " --env=%s=%s" % (e, task["environment"][e])
-
- ex = execution_script.substitute(docker_hash=task["docker_hash"],
- docker_locator=task["docker_locator"],
- tmpdir=tmpdir,
- env=env,
- cmd=" ".join([pipes.quote(c) for c in task["command"]]))
-
- print ex
-
- if have_slurm:
- pass
- else:
- resources["slots"][slot]["task"] = subprocess.Popen(ex, shell=True)
-
-
-def main():
-
- parser = argparse.ArgumentParser()
-
- parser.add_argument("--job-api-token", type=str)
- parser.add_argument("--job", type=str)
- parser.add_argument("--git-dir", type=str)
-
- args = parser.parse_args()
-
- api = api_from_config("v1",
- {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
- "ARVADOS_API_TOKEN": args.job_api_token,
- "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE")})
-
- if arvados.util.job_uuid_pattern.match(args.job):
- try:
- ex = api.jobs().lock(uuid=args.job).execute()
- if "error" in ex:
- sys.exit(EX_TEMPFAIL)
- except:
- sys.exit(EX_TEMPFAIL)
-
- job = api.jobs().get(args.job)
- else:
- job = json.loads(args.job)
-
- resources = determine_resources()
-
- pprint.pprint(resources)
-
- for t in job["script_parameters"]["tasks"]:
- for s in resources["slots"]:
- if resources["slots"][s]["task"] is None:
- run_on_slot(resources, s, t)
- break
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
new file mode 100644
index 0000000..2e8a926
--- /dev/null
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -0,0 +1,164 @@
+import arvados
+import argparse
+import subprocess
+import re
+import json
+import random
+from string import Template
+import pipes
+import sys
+from arvados.api import api_from_config
+import os
+import pprint
+import arvados.events
+import threading
+
+EX_TEMPFAIL = 75
+
+def parse_sinfo(sinfo):
+ nodes = {}
+ for l in sinfo.splitlines():
+ m = re.match("(\d+) (.*)", l)
+ if m:
+ cpus = int(m.group(1))
+ for n in m.group(2).split(","):
+ rn = re.match("([^[]+)\[(\d+)-(\d+)\]", n)
+ if rn:
+ for c in range(int(rn.group(2)), int(rn.group(3))+1):
+ nodes["%s%i" % (rn.group(1), c)] = {"slots": cpus}
+ else:
+ nodes[n] = {"slots": cpus}
+ return nodes
+
+def make_slots(nodes):
+ slots = {}
+ for n in nodes:
+ for c in range(0, nodes[n]["slots"]):
+ slots["%s[%i]" % (n, c)] = {"node": n, "slot": c, "task": None}
+ return slots
+
+def determine_resources(slurm_jobid=None, slurm_nodelist=None):
+ have_slurm = (slurm_jobid is not None) and (slurm_nodelist is not None)
+
+ if have_slurm:
+ sinfo = subprocess.check_output(["sinfo", "-h", "--format=%c %N", "--nodes=" + slurm_nodelist])
+ else:
+ with open("/proc/cpuinfo") as cpuinfo:
+ n = 0
+ for p in cpuinfo:
+ if p.startswith("processor"):
+ n += 1
+ sinfo = "%i localhost" % n
+
+ nodes = parse_sinfo(sinfo)
+ slots = make_slots(nodes)
+
+ return {"have_slurm": have_slurm,
+ "nodes": nodes,
+ "slots": slots}
+
+def run_on_slot(resources, slot, task):
+ tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
+
+ execution_script = Template("""
+if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
+ arv-get $docker_locator/$docker_hash.tar | docker load
+fi
+rm -rf $tmpdir
+mkdir -p $tmpdir/job_output $tmpdir/keep
+if which crunchstat ; then
+ CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
+else
+ CRUNCHSTAT=""
+fi
+
+exec \
+ arv-mount --by-id $tmpdir/keep --allow-other --exec \
+ $$CRUNCHSTAT \
+ docker run --attach=stdout --attach=stderr -i --rm --cidfile=$tmpdir/cidfile --sig-proxy \
+ --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
+ --workdir=/tmp/job_output --user=$$UID $env $docker_hash $cmd
+""")
+ env = ""
+ for e in task["environment"]:
+ env += " --env=%s=%s" % (e, task["environment"][e])
+
+ ex = execution_script.substitute(docker_hash=task["docker_hash"],
+ docker_locator=task["docker_locator"],
+ tmpdir=tmpdir,
+ env=env,
+ cmd=" ".join([pipes.quote(c) for c in task["command"]]))
+
+ print ex
+
+ if resources["have_slurm"]:
+ pass
+ else:
+ resources["slots"][slot]["task"] = subprocess.Popen(ex, shell=True)
+ resources["slots"][slot]["task"].wait()
+
+class TaskEvents(object):
+ def __init__(self, api_config, resources, job_uuid):
+ self.resources = resources
+ self.ws = arvados.events.subscribe(api_from_config("v1", api_config), [["object_uuid", "=", job_uuid]], self.on_event)
+ self.ws.subscribe([["object_uuid", "is_a", "arvados#jobTask"]])
+
+ def on_event(self, ev):
+ if ev.get('event_type') == "update" and ev['object_kind'] == "arvados#job":
+ if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ self.ws.close()
+ elif ev.get('object_kind') == "arvados#jobTask":
+ if ev.get('event_type') == "create":
+ pass
+ if ev.get('event_type') == "update":
+ pass
+
+def main(arguments=None):
+
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--job-api-token", type=str)
+ parser.add_argument("--job", type=str)
+ parser.add_argument("--git-dir", type=str)
+
+ print sys.argv
+ args = parser.parse_args(arguments)
+
+ api = None
+ if os.environ.get("ARVADOS_API_HOST"):
+ api_config = {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
+ "ARVADOS_API_TOKEN": args.job_api_token,
+ "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE")}
+ api = api_from_config("v1", api_config)
+
+ job_uuid = None
+ if arvados.util.job_uuid_pattern.match(args.job):
+ if not api:
+ print "Missing ARVADOS_API_HOST"
+ return 1
+ try:
+ job_uuid = args.job
+ ex = api.jobs().lock(uuid=args.job).execute()
+ if "error" in ex:
+ return EX_TEMPFAIL
+ except:
+ return EX_TEMPFAIL
+
+ job = api.jobs().get(uuid=args.job).execute()
+
+ if job["state"] in ("Complete", "Failed", "Cancelled"):
+ print "Job is already %s" % (job["state"])
+ return EX_TEMPFAIL
+ else:
+ job = json.loads(args.job)
+
+ resources = determine_resources(os.environ.get("SLURM_JOBID"),
+ os.environ.get("SLURM_NODELIST"))
+
+ if job_uuid:
+ ts = TaskEvents(api_config, resources, job_uuid)
+ ts.ws.run_forever()
+ else:
+ run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"])
+
+ return 0
diff --git a/sdk/python/bin/arv-cwl-job b/sdk/python/bin/arv-cwl-job
new file mode 100755
index 0000000..e96acf7
--- /dev/null
+++ b/sdk/python/bin/arv-cwl-job
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+
+import sys
+from arvados.commands.cwl_job import main
+
+sys.exit(main())
diff --git a/sdk/python/tests/test_cwl_job.py b/sdk/python/tests/test_cwl_job.py
new file mode 100644
index 0000000..2ab4791
--- /dev/null
+++ b/sdk/python/tests/test_cwl_job.py
@@ -0,0 +1,30 @@
+import arvados
+import unittest
+import arvados_testutil as tutil
+import arvados.commands.cwl_job as cwl_job
+
+class CwlJobTestCase(unittest.TestCase):
+ def test_parse_sinfo(self):
+ nodes = cwl_job.parse_sinfo("""
+16 compute0,compute2
+8 compute1,compute[3-5],compute7
+""")
+ self.assertEqual({"compute0": {"slots": 16},
+ "compute2": {"slots": 16},
+ "compute1": {"slots": 8},
+ "compute3": {"slots": 8},
+ "compute4": {"slots": 8},
+ "compute5": {"slots": 8},
+ "compute7": {"slots": 8}},
+ nodes)
+
+
+ def test_make_slots(self):
+ slots = cwl_job.make_slots({"compute0": {"slots": 2}, "compute1": {"slots": 4}})
+ self.assertEqual({"compute0[0]": {"node": "compute0", "slot": 0, "task": None},
+ "compute0[1]": {"node": "compute0", "slot": 1, "task": None},
+ "compute1[0]": {"node": "compute1", "slot": 0, "task": None},
+ "compute1[1]": {"node": "compute1", "slot": 1, "task": None},
+ "compute1[2]": {"node": "compute1", "slot": 2, "task": None},
+ "compute1[3]": {"node": "compute1", "slot": 3, "task": None}},
+ slots)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list