[ARVADOS] updated: 42008041173b0f7dcc4e90023517767396c64f78

git at public.curoverse.com git at public.curoverse.com
Tue Apr 7 14:30:27 EDT 2015


Summary of changes:
 sdk/python/arvados/commands/cwl_job.py      | 82 +++++++++++++++++++----------
 sdk/python/arvados/commands/keepdocker.py   |  9 +++-
 sdk/python/arvados/events.py                |  4 ++
 sdk/python/tests/test_cwl_job.py            | 28 +++++++---
 services/api/config/application.default.yml |  2 +-
 5 files changed, 87 insertions(+), 38 deletions(-)

       via  42008041173b0f7dcc4e90023517767396c64f78 (commit)
      from  87e7d0850132db2805096b5bffbcd101ee5dbea2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 42008041173b0f7dcc4e90023517767396c64f78
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Apr 7 14:33:32 2015 -0400

    4685: Can now run tasks!

diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
index 6eea2f1..9350d72 100644
--- a/sdk/python/arvados/commands/cwl_job.py
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -15,6 +15,7 @@ import threading
 import signal
 import arvados.commands.keepdocker
 import tempfile
+import pprint
 
 EX_TEMPFAIL = 75
 TASK_TEMPFAIL = 111
@@ -99,14 +100,15 @@ class TaskMonitor(threading.Thread):
         check = api.job_tasks().get(uuid=self.task["uuid"]).execute()
         if check["success"] is None:
             # Task didn't set its own success, so mark it failed.
-            api.job_tasks().update(uuid=self.task["uuid"], body={"success": False}).execute()
+            check = api.job_tasks().update(uuid=self.task["uuid"], body={"success": False}).execute()
+        print "Task %s completed with success %s" % (self.task["uuid"], check["success"])
 
-def run_on_slot(resources, slot, task, task_uuid):
+def run_on_slot(resources, api_config, slot, task):
     execution_script = Template(script_header + """
 mkdir $tmpdir/job_output $tmpdir/keep
 
 if which crunchstat ; then
-  CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
+  CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000"
 else
   CRUNCHSTAT=""
 fi
@@ -121,7 +123,7 @@ arv-mount --by-id $tmpdir/keep --allow-other --exec \
     --attach=stderr \
     -i \
     --rm \
-    --cidfile=$tmpdir/cidfile \
+    --cidfile=$cidfile \
     --sig-proxy \
     --volume=$tmpdir/keep:/keep:ro \
     --volume=$tmpdir/job_output:/tmp/job_output:rw \
@@ -135,19 +137,26 @@ arv-mount --by-id $tmpdir/keep --allow-other --exec \
 
 code=$$?
 
-OUT=`arv-put --portable-data-hash $tmpdir/job_output`
+echo "Docker exit code is $$code"
+
+OUT=`arv-put --portable-data-hash --batch-progress $tmpdir/job_output`
+arv_put_code=$$?
 
+echo "arv-put exit code is $$arv_put_code"
 echo "Output is $$OUT"
 
-if [[ -n "$$task_uuid" ]]; then
-  if [[ "$$code" == "0" ]]; then
-    if [[ "$$?" != "0" ]]; then
-      arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": true}"
+if test -n "$task_uuid" ; then
+  if test "$$code" = "0" ; then
+    if test "$$arv_put_code" = "0" ; then
+      echo "Task success"
+      arv job_task update --uuid $task_uuid --job-task "{\\"output\\":\\"$$OUT\\", \\"success\\": true}"
     else
+      echo "Task temporary failure"
       code=$TASK_TEMPFAIL
     fi
   else
-    arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": false}"
+    echo "Task failed"
+    arv job_task update --uuid $task_uuid --job-task "{\\"output\\":\\"$$OUT\\", \\"success\\": false}"
   fi
 fi
 
@@ -156,25 +165,29 @@ rm -rf $tmpdir
 exit $$code
 """)
 
+    pprint.pprint(task)
+
     tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
 
-    env = " ".join(["--env=%s=%s" % (pipes.quote(e), pipes.quote(task["environment"][e])) for e in task["environment"]])
+    params = task["parameters"]
+
+    env = " ".join(["--env=%s=%s" % (pipes.quote(e), pipes.quote(params["environment"][e])) for e in params.get("environment", {})])
 
     stdin_redirect=""
     stdout_redirect=""
 
-    if task.get("stdin"):
-        stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdin"]))
+    if params.get("stdin"):
+        stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(params["stdin"]))
 
-    if task.get("stdout"):
-        stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdout"]))
+    if params.get("stdout"):
+        stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(params["stdout"]))
 
-    ex = execution_script.substitute(docker_hash=pipes.quote(task["docker_hash"]),
+    ex = execution_script.substitute(docker_hash=pipes.quote(params["docker_hash"]),
                                      tmpdir=pipes.quote(tmpdir),
+                                     cidfile=os.path.join(tmpdir, "cidfile"),
                                      env=env,
-                                     cmd=" ".join([pipes.quote(c) for c in task["command"]]),
-                                     task_uuid=pipes.quote(task_uuid),
-                                     project_uuid=pipes.quote(project_uuid),
+                                     cmd=" ".join([pipes.quote(c) for c in params["command"]]),
+                                     task_uuid=pipes.quote(task["uuid"]),
                                      stdin_redirect=stdin_redirect,
                                      stdout_redirect=stdout_redirect,
                                      TASK_CANCELED=TASK_CANCELED,
@@ -183,13 +196,16 @@ exit $$code
                                      ARVADOS_API_TOKEN=pipes.quote(api_config["ARVADOS_API_TOKEN"]),
                                      ARVADOS_API_HOST_INSECURE=pipes.quote(api_config.get("ARVADOS_API_HOST_INSECURE", "0")))
 
+
     if resources["have_slurm"]:
         pass
     else:
         slots = resources["slots"]
         slots[slot]["task"] = task
-        slots[slot]["task"]["__subprocess"] = subprocess.Popen(ex, shell=True)
-        TaskMonitor(slots[slot]["task"]["__subprocess"], api_config, task).start()
+        sub = subprocess.Popen(ex, shell=True, stdin=subprocess.PIPE, close_fds=True)
+        sub.stdin.close()
+        slots[slot]["task"]["__subprocess"] = sub
+        TaskMonitor(sub, api_config, task).start()
 
 class TaskEvents(object):
     def __init__(self, api_config, resources, job_uuid):
@@ -198,6 +214,7 @@ class TaskEvents(object):
         self.ws = arvados.events.subscribe(api_from_config("v1", api_config), [["object_uuid", "=", job_uuid]], self.on_event)
         self.ws.subscribe([["object_uuid", "is_a", "arvados#jobTask"]])
         self.task_queue = []
+        self.api_config = api_config
 
     def next_task(self):
         while self.task_queue:
@@ -205,9 +222,17 @@ class TaskEvents(object):
             for slot in self.slots:
                 if self.slots[slot]["task"] is None:
                     task = self.task_queue[0]
-                    run_on_slot(self.resources, slot, task["parameters"], task["uuid"])
-                    del self.task_queue[0]
-                    assigned = True
+                    try:
+                        assigned = True
+                        run_on_slot(self.resources, self.api_config, slot, task)
+                        del self.task_queue[0]
+                    except:
+                        api = api_from_config("v1", self.api_config)
+                        check = api.job_tasks().get(uuid=task["uuid"]).execute()
+                        if check["success"] is None:
+                            api.job_tasks().update(uuid=task["uuid"], body={"success": False}).execute()
+                    break
+
             if not assigned:
                 break
 
@@ -217,8 +242,8 @@ class TaskEvents(object):
 
     def finish_task(self, task):
         for slot in self.slots:
-            if self.slots[slot]["task"]["uuid"] == task["uuid"]:
-                slot["task"] = None
+            if self.slots[slot]["task"] and self.slots[slot]["task"]["uuid"] == task["uuid"]:
+                self.slots[slot]["task"] = None
         self.next_task()
 
     def cancel_tasks(self):
@@ -307,7 +332,8 @@ docker run \
     if resources["have_slurm"]:
         pass
     else:
-        sub = subprocess.Popen(ex, shell=True)
+        sub = subprocess.Popen(ex, shell=True, stdin=subprocess.PIPE, close_fds=True)
+        sub.stdin.close()
         JobMonitor(sub, api_config, job).start()
         return sub
 
@@ -380,6 +406,6 @@ def main(arguments=None):
 
         ts.ws.run_forever()
     else:
-        run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"], "")
+        run_on_slot(resources, api_config, resources["slots"].keys()[0], {"parameters": job["script_parameters"]})
 
     return 0
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index 145deb1..59e0c69 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -196,12 +196,17 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None,
     if image_name:
         image_link_name = "{}:{}".format(image_name, image_tag or 'latest')
         docker_image_filters.append(['name', '=', image_link_name])
-    if image_hash:
+    elif image_hash:
         docker_image_filters.append(['name', '=', image_hash])
-    if image_collection:
+    elif image_collection:
         docker_image_filters.append(['head_uuid', '=', image_collection])
 
     existing_links = list_all(api_client.links().list, num_retries, filters=docker_image_filters)
+
+    if image_name or image_hash:
+        existing_links = list_all(api_client.links().list, num_retries,
+                              filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']],
+                                       ['head_uuid', 'in', [u['head_uuid'] for u in existing_links]]])
     images = {}
     for link in existing_links:
         collection_uuid = link["head_uuid"]
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 09f2a87..71697da 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -32,8 +32,10 @@ class EventClient(WebSocketClient):
         super(EventClient, self).__init__(url, ssl_options=ssl_options)
         self.filters = filters
         self.on_event = on_event
+        self.ready = threading.Event()
 
     def opened(self):
+        self.ready.set()
         self.subscribe(self.filters)
 
     def received_message(self, m):
@@ -47,12 +49,14 @@ class EventClient(WebSocketClient):
             pass
 
     def subscribe(self, filters, last_log_id=None):
+        self.ready.wait()
         m = {"method": "subscribe", "filters": filters}
         if last_log_id is not None:
             m["last_log_id"] = last_log_id
         self.send(json.dumps(m))
 
     def unsubscribe(self, filters):
+        self.ready.wait()
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
 class PollClient(threading.Thread):
diff --git a/sdk/python/tests/test_cwl_job.py b/sdk/python/tests/test_cwl_job.py
index a633ad1..9de589d 100644
--- a/sdk/python/tests/test_cwl_job.py
+++ b/sdk/python/tests/test_cwl_job.py
@@ -6,6 +6,8 @@ import run_test_server
 import subprocess
 import os
 import stat
+import arvados.commands.keepdocker
+import re
 
 def chmodx(fn):
     os.chmod(fn, stat.S_IRUSR|stat.S_IRGRP|stat.S_IROTH|stat.S_IXUSR|stat.S_IXGRP|stat.S_IXOTH)
@@ -20,19 +22,25 @@ class CwlJobTestCase(run_test_server.TestCaseWithServers):
         super(CwlJobTestCase, cls).setUpClass()
         run_test_server.authorize_with("admin")
         cls.api_client = arvados.api('v1')
+
+        host = re.match("^https?://([^/]+)", cls.api_client._rootDesc.get('rootUrl'))
+        os.environ["ARVADOS_API_HOST"] = host.group(1)
+
         os.chdir(os.path.join(run_test_server.ARVADOS_DIR, "docker"))
         subprocess.check_call(["./build.sh", "cwl-runner-image"])
         arvados.commands.keepdocker.upload_image(cls.api_client, ["arvados/cwl-runner"])
+        arvados.commands.keepdocker.upload_image(cls.api_client, ["arvados/debian", "wheezy"])
 
-        imgs_in_arv = list_images_in_arv(cls.api_client, 1, image_name="arvados/cwl-runner")
+        imgs_in_arv = arvados.commands.keepdocker.list_images_in_arv(cls.api_client, 1, image_name="arvados/debian", image_tag="wheezy")
 
         repo = cls.api_client.repositories().create(body={"repository": {
             "owner_uuid": "zzzzz-tpzed-000000000000000",
             "name": "testrepo"
             }}).execute()
 
-        os.mkdir(os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo"))
-        os.chdir(os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo"))
+        testrepo = os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo")
+        os.mkdir(testrepo)
+        os.chdir(testrepo)
         subprocess.check_call(["git", "init"])
 
         with open("foo", "w") as f:
@@ -55,24 +63,30 @@ exit 1
 """#!/usr/bin/env python
 import arvados
 import arvados.events
+import sys
 
 ws = None
+retcode = None
 
 def on_event(ev):
     global ws
+    global retcode
     if ev.get('object_kind') == "arvados#jobTask":
         if ev.get('event_type') == "update":
             if ev["properties"]["new_attributes"].get("success") is not None:
+                retcode = 0 if ev["properties"]["new_attributes"]["success"] else 1
                 ws.close()
 
-ws = arvados.events.subscribe(arvados.api(), [["object_uuid", "is_a", "arvados#jobTask"]], self.on_event)
+ws = arvados.events.subscribe(arvados.api(), [["object_uuid", "is_a", "arvados#jobTask"]], on_event)
 
-arvados.api().job_tasks().create(body={
+arvados.api().job_tasks().create(body={"parameters": {
   "environment": {},
-  "docker_hash": '""" + imgs_in_arv[0].hash + """',
-  "command": ["ls", "/"]}).execute()
+  "docker_hash": '""" + imgs_in_arv[0][1]["dockerhash"] + """',
+  "command": ["ls", "/"]} }).execute()
 
 ws.run_forever()
+
+sys.exit(retcode)
 """)
         chmodx("one_task")
 
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index 53b787b..6a31986 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -45,7 +45,7 @@ test:
   blob_signing_key: zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc
   user_profile_notification_address: arvados at example.com
   workbench_address: https://localhost:3001/
-  git_repositories_dir: <%= "#{Dir.pwd}/tmp/git" %>
+  git_repositories_dir: <%= Rails.root.join("tmp", "git") %>
   git_http_base: http://localhost:3005
 
 common:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list