[ARVADOS] updated: e4ea50a9fb2b1f44352d3adaee5df209c2a72c96

git at public.curoverse.com git at public.curoverse.com
Fri Apr 3 11:42:31 EDT 2015


Summary of changes:
 sdk/python/arvados/commands/cwl_job.py     | 213 +++++++++++++++++++++++++----
 sdk/python/arvados/commands/cwl_runner.py  |   7 +
 sdk/python/bin/{arv-cwl-job => cwl-runner} |   2 +-
 sdk/python/setup.py                        |   3 +-
 4 files changed, 198 insertions(+), 27 deletions(-)
 create mode 100644 sdk/python/arvados/commands/cwl_runner.py
 copy sdk/python/bin/{arv-cwl-job => cwl-runner} (53%)

       via  e4ea50a9fb2b1f44352d3adaee5df209c2a72c96 (commit)
      from  2b08beada8fb933dc178fe5b9a52440af4dc4faa (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e4ea50a9fb2b1f44352d3adaee5df209c2a72c96
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 3 11:46:03 2015 -0400

    4685: Adding "job executive" to cwl_job.

diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
index 2e8a926..ae94eea 100644
--- a/sdk/python/arvados/commands/cwl_job.py
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -12,8 +12,13 @@ import os
 import pprint
 import arvados.events
 import threading
+import signal
+import arvados.commands.keepdocker
+import tempfile
 
 EX_TEMPFAIL = 75
+TASK_TEMPFAIL = 111
+TASK_CANCELED = 112
 
 def parse_sinfo(sinfo):
     nodes = {}
@@ -57,61 +62,210 @@ def determine_resources(slurm_jobid=None, slurm_nodelist=None):
             "nodes": nodes,
             "slots": slots}
 
-def run_on_slot(resources, slot, task):
-    tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
-
+def run_on_slot(resources, slot, task, task_uuid):
     execution_script = Template("""
+set -e
+set -v
+
+read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
+trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" EXIT INT QUIT TERM
+
 if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
     arv-get $docker_locator/$docker_hash.tar | docker load
 fi
+
 rm -rf $tmpdir
 mkdir -p $tmpdir/job_output $tmpdir/keep
+
 if which crunchstat ; then
   CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
 else
   CRUNCHSTAT=""
 fi
 
-exec  \
-  arv-mount --by-id $tmpdir/keep --allow-other --exec \
+set +e
+
+arv-mount --by-id $tmpdir/keep --allow-other --exec \
   $$CRUNCHSTAT \
-  docker run --attach=stdout --attach=stderr -i --rm --cidfile=$tmpdir/cidfile --sig-proxy \
-  --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
-  --workdir=/tmp/job_output --user=$$UID $env $docker_hash $cmd
+  docker run --attach=stdout --attach=stderr -i --rm
+    --cidfile=$tmpdir/cidfile --sig-proxy \
+    --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
+    --workdir=/tmp/job_output --user=$$UID $env $docker_hash $cmd $stdin_redirect $stdout_redirect
+
+code=$$?
+
+OUT=`arv-put --portable-data-hash $tmpdir/job_output`
+
+echo "Output is $$OUT"
+
+if [[ -n "$$task_uuid" ]]; then
+  if [[ "$$code" == "0" ]]; then
+    if [[ "$$?" == "0" ]]; then
+      arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": true}"
+    else
+      code=$TASK_TEMPFAIL
+    fi
+  else
+    arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": false}"
+  fi
+fi
+
+rm -rf $tmpdir
+
+exit $$code
 """)
-    env = ""
-    for e in task["environment"]:
-        env += " --env=%s=%s" % (e, task["environment"][e])
 
-    ex = execution_script.substitute(docker_hash=task["docker_hash"],
-                                docker_locator=task["docker_locator"],
-                                tmpdir=tmpdir,
-                                env=env,
-                                cmd=" ".join([pipes.quote(c) for c in task["command"]]))
+    tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
+
+    env = " ".join(["--env=%s=%s" % (pipes.quote(e), pipes.quote(task["environment"][e])) for e in task["environment"]])
+
+    stdin_redirect=""
+    stdout_redirect=""
+
+    if task["stdin"]:
+        stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdin"]))
+
+    if task["stdout"]:
+        stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdout"]))
 
-    print ex
+    ex = execution_script.substitute(docker_hash=pipes.quote(task["docker_hash"]),
+                                     docker_locator=pipes.quote(task["docker_locator"]),
+                                     tmpdir=pipes.quote(tmpdir),
+                                     env=env,
+                                     cmd=" ".join([pipes.quote(c) for c in task["command"]]),
+                                     task_uuid=pipes.quote(task_uuid),
+                                     project_uuid=pipes.quote(project_uuid),
+                                     stdin_redirect=stdin_redirect,
+                                     stdout_redirect=stdout_redirect,
+                                     TASK_CANCELED=TASK_CANCELED,
+                                     TASK_TEMPFAIL=TASK_TEMPFAIL)
 
     if resources["have_slurm"]:
         pass
     else:
-        resources["slots"][slot]["task"] = subprocess.Popen(ex, shell=True)
-        resources["slots"][slot]["task"].wait()
+        slots = resources["slots"]
+        slots[slot]["task"] = task
+        slots[slot]["task"]["__subprocess"] = subprocess.Popen(ex, shell=True)
 
 class TaskEvents(object):
     def __init__(self, api_config, resources, job_uuid):
         self.resources = resources
+        self.slots = resources["slots"]
         self.ws = arvados.events.subscribe(api_from_config("v1", api_config), [["object_uuid", "=", job_uuid]], self.on_event)
         self.ws.subscribe([["object_uuid", "is_a", "arvados#jobTask"]])
+        self.task_queue = []
+
+    def next_task(self):
+        while self.task_queue:
+            assigned = False
+            for slot in self.slots:
+                if self.slots[slot]["task"] is None:
+                    task = self.task_queue[0]
+                    run_on_slot(self.resources, slot, task, task["uuid"])
+                    del self.task_queue[0]
+                    assigned = True
+            if not assigned:
+                break
+
+    def new_task(self, task):
+        self.task_queue.append(task)
+        self.next_task()
+
+    def finish_task(self, task):
+        for slot in self.slots:
+            if self.slots[slot]["task"]["uuid"] == task["uuid"]:
+                slot["task"] = None
+        self.next_task()
+
+    def cancel_tasks(self):
+        for slot in self.slots:
+            if self.slots[slot]["task"].get("__subprocess") is not None:
+                self.slots[slot]["task"]["__subprocess"].terminate()
+                self.slots[slot]["task"]["__subprocess"].wait()
 
     def on_event(self, ev):
         if ev.get('event_type') == "update" and ev['object_kind'] == "arvados#job":
-            if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+            if ev["properties"]["new_attributes"]["state"] == "Cancelled":
+                self.cancel_tasks()
+                self.ws.close()
+            elif ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed"):
                 self.ws.close()
         elif ev.get('object_kind') == "arvados#jobTask":
             if ev.get('event_type') == "create":
-                pass
+                self.new_task(ev["properties"]["new_attributes"])
             if ev.get('event_type') == "update":
-                pass
+                if ev["properties"]["new_attributes"].get("success") is not None:
+                    self.finish_task(ev["properties"]["new_attributes"])
+
+def run_executive(api, job, api_config):
+    execution_script = Template("""
+set -e
+set -v
+
+read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
+trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" EXIT INT QUIT TERM
+
+if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
+    arv-get $docker_locator/$docker_hash.tar | docker load
+fi
+
+rm -rf $tmpdir
+mkdir -p $tmpdir
+
+cd $tmpdir
+git init
+git config --local credential.$githttp/.helper '!tok(){ echo password=$ARVADOS_API_TOKEN; };tok'
+git config --local credential.$githttp/.username none
+ARVADOS_API_HOST=$ARVADOS_API_HOST ARVADOS_API_TOKEN=$ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE=$ARVADOS_API_HOST_INSECURE git fetch $githttp/$gitrepo $script_version
+git checkout $script_version
+
+docker run \
+    --env=CRUNCH_JOB_UUID=$job_uuid \
+    --env=ARVADOS_API_HOST=$ARVADOS_API_HOST \
+    --env=ARVADOS_API_TOKEN=$ARVADOS_API_TOKEN \
+    --env=ARVADOS_API_HOST_INSECURE=$ARVADOS_API_HOST_INSECURE \
+    --volume=$tmpdir:/tmp/git:ro \
+    --privileged \
+    --user=$$UID \
+    --rm \
+    --workdir=/tmp/git \
+    $docker_hash /tmp/git/$script
+""")
+
+    tmpdir = "/tmp/%s-%i" % (job["uuid"], random.randint(1, 100000))
+    cr = arvados.CollectionReader(job["docker_image_locator"], api_client=api)
+
+    if len(cr) != 1:
+        raise arvados.errors.ArgumentError("docker_image_locator must only contain a single file")
+
+    docker_image = re.match("([0-9a-f]{40})\.tar", cr.keys()[0])
+    if docker_image:
+        docker_hash = docker_image.group(1)
+    else:
+        raise arvados.errors.ArgumentError("docker_image_locator must contain a docker image")
+
+    ex = execution_script.substitute(docker_hash=docker_hash,
+                                     docker_locator=job["docker_image_locator"],
+                                     tmpdir=tmpdir,
+                                     ARVADOS_API_HOST=pipes.quote(api_config["ARVADOS_API_HOST"]),
+                                     ARVADOS_API_TOKEN=pipes.quote(api_config["ARVADOS_API_TOKEN"]),
+                                     ARVADOS_API_HOST_INSECURE=pipes.quote(api_config.get("ARVADOS_API_TOKEN", "0")),
+                                     TASK_CANCELED=TASK_CANCELED,
+                                     githttp=pipes.quote(api._rootDesc.get("gitHttpBase")),
+                                     gitrepo=pipes.quote(job["repository"]))
+
+    if resources["have_slurm"]:
+        pass
+    else:
+        subprocess.Popen(ex, shell=True)
+
+class SigHandler(object):
+    def __init__(self, ts):
+        self.ts = ts
+
+    def send_signal(self, signum, fram):
+        self.ts.ws.close()
+        self.ts.cancel_tasks()
 
 def main(arguments=None):
 
@@ -121,14 +275,13 @@ def main(arguments=None):
     parser.add_argument("--job", type=str)
     parser.add_argument("--git-dir", type=str)
 
-    print sys.argv
     args = parser.parse_args(arguments)
 
     api = None
     if os.environ.get("ARVADOS_API_HOST"):
         api_config = {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
                       "ARVADOS_API_TOKEN": args.job_api_token,
-                      "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE")}
+                      "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "0")}
         api = api_from_config("v1", api_config)
 
     job_uuid = None
@@ -157,8 +310,18 @@ def main(arguments=None):
 
     if job_uuid:
         ts = TaskEvents(api_config, resources, job_uuid)
+        run_executive(api, job, api_config["ARVADOS_API_TOKEN"])
+
+        # Set up signal handling
+        sig = SigHandler(ts)
+
+        # Forward terminate signals to the subprocesses.
+        signal.signal(signal.SIGINT, self.sig)
+        signal.signal(signal.SIGTERM, self.sig)
+        signal.signal(signal.SIGQUIT, self.sig)
+
         ts.ws.run_forever()
     else:
-        run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"])
+        run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"], "")
 
     return 0
diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
new file mode 100644
index 0000000..cb2b62f
--- /dev/null
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -0,0 +1,7 @@
+import arvados
+import cwltool
+
+# Whee!
+
+def main(arguments):
+    return 0
diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner
new file mode 100755
index 0000000..354b54e
--- /dev/null
+++ b/sdk/python/bin/cwl-runner
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+
+import sys
+from arvados.commands.cwl_runner import main
+
+sys.exit(main())
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index ca28025..7aa6b32 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -41,7 +41,8 @@ setup(name='arvados-python-client',
         'httplib2',
         'requests>=2.4',
         'urllib3',
-        'ws4py'
+        'ws4py',
+        'cwltool',
         ],
       test_suite='tests',
       tests_require=['mock>=1.0', 'PyYAML'],

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list