[ARVADOS] updated: fd5a11b21af83870c4122afdf94844dd4cf63cc6

git at public.curoverse.com git at public.curoverse.com
Thu Feb 4 14:58:28 EST 2016


Summary of changes:
 sdk/cli/bin/crunch-job       |  2 +-
 sdk/python/arvados/events.py | 63 +++++++++++++++++++++++++++++++-------------
 2 files changed, 46 insertions(+), 19 deletions(-)

       via  fd5a11b21af83870c4122afdf94844dd4cf63cc6 (commit)
       via  cda9a7d3e3601a7a8bb721261c5b26210e9f629f (commit)
       via  ca4e92183dfca9f226898790a09239935e17eba9 (commit)
      from  be2197781a206ba7cbf5bbfbf61a70375fb10d68 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit fd5a11b21af83870c4122afdf94844dd4cf63cc6
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Feb 4 14:29:39 2016 -0500

    Recognize another way slurm tells us about node failures.
    
    Retry, instead of giving up, in situations like this:
    
    2016-02-02_08:42:26 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 stderr srun: error: Aborting, io error and missing step on node 0
    2016-02-02_08:42:26 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 stderr srun: Job step aborted: Waiting up to 2 seconds for job step to finish.
    2016-02-02_08:42:28 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 stderr srun: error: Timed out waiting for job step to complete
    2016-02-02_08:42:28 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 child 42984 on compute26.1 exit 0 success=
    2016-02-02_08:42:28 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 ERROR: Task process exited 0, but never updated its task record to indicate success and record its output.
    2016-02-02_08:42:28 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 failure (#1, permanent) after 560 seconds
    2016-02-02_08:42:28 wx7k5-8i9sb-guk2lv53z3572dc 40682 3 task output (0 bytes):
    
    No issue #

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 70d05f0..5eb2f90 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1461,7 +1461,7 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: Node failure on/) {
+    elsif ($line =~ /srun: error: (Node failure on|Aborting, io error)/) {
       my $job_slot_index = $jobstep[$job]->{slotindex};
       $slot[$job_slot_index]->{node}->{fail_count}++;
       $jobstep[$job]->{tempfail} = 1;

commit cda9a7d3e3601a7a8bb721261c5b26210e9f629f
Merge: be21977 ca4e921
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Feb 4 13:17:31 2016 -0500

    Merge branch '8288-poll-client-close-timeout' refs #8288


commit ca4e92183dfca9f226898790a09239935e17eba9
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Feb 1 01:58:34 2016 -0500

    8288: Add timeout option to close() method of event clients.
    
    Previously in EventClient, close() didn't wait for anything. Now, if a
    timeout is given, it waits for ws4py to call the closed() callback to
    indicate the connection has closed.
    
    Previously in PollClient, close() waited indefinitely for the polling
    thread to terminate.  This can take a very long time if, for example,
    there are multiple subscriptions and the "get logs" API transaction is
    slow.
    
    The only apparent reason a caller would want to wait here at all is to
    guarantee the simplifying assumption the on_event() callback is never
    called after close().  Now, instead of letting the thread run until
    all events are received and handled, PollClient achieves this the same
    way EventClient does: ignore events that arrive after close().

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 3132da3..94b8a9d 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -13,6 +13,7 @@ from ws4py.client.threadedclient import WebSocketClient
 
 _logger = logging.getLogger('arvados.events')
 
+
 class EventClient(WebSocketClient):
     def __init__(self, url, filters, on_event, last_log_id):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
@@ -29,23 +30,33 @@ class EventClient(WebSocketClient):
         self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
-        self._closed_lock = threading.RLock()
-        self._closed = False
+        self._closing_lock = threading.RLock()
+        self._closing = False
+        self._closed = threading.Event()
 
     def opened(self):
         self.subscribe(self.filters, self.last_log_id)
 
+    def closed(self, code, reason=None):
+        self._closed.set()
+
     def received_message(self, m):
-        with self._closed_lock:
-            if not self._closed:
+        with self._closing_lock:
+            if not self._closing:
                 self.on_event(json.loads(str(m)))
 
-    def close(self, code=1000, reason=''):
-        """Close event client and wait for it to finish."""
+    def close(self, code=1000, reason='', timeout=0):
+        """Close event client and optionally wait for it to finish.
+
+        :timeout: is the number of seconds to wait for ws4py to
+        indicate that the connection has closed.
+        """
         super(EventClient, self).close(code, reason)
-        with self._closed_lock:
+        with self._closing_lock:
             # make sure we don't process any more messages.
-            self._closed = True
+            self._closing = True
+        # wait for ws4py to tell us the connection is closed.
+        self._closed.wait(timeout=timeout)
 
     def subscribe(self, filters, last_log_id=None):
         m = {"method": "subscribe", "filters": filters}
@@ -56,6 +67,7 @@ class EventClient(WebSocketClient):
     def unsubscribe(self, filters):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
         super(PollClient, self).__init__()
@@ -67,8 +79,9 @@ class PollClient(threading.Thread):
         self.on_event = on_event
         self.poll_time = poll_time
         self.daemon = True
-        self.stop = threading.Event()
         self.last_log_id = last_log_id
+        self._closing = threading.Event()
+        self._closing_lock = threading.RLock()
 
     def run(self):
         self.id = 0
@@ -83,7 +96,7 @@ class PollClient(threading.Thread):
 
         self.on_event({'status': 200})
 
-        while not self.stop.isSet():
+        while not self._closing.is_set():
             max_id = self.id
             moreitems = False
             for f in self.filters:
@@ -91,24 +104,38 @@ class PollClient(threading.Thread):
                 for i in items["items"]:
                     if i['id'] > max_id:
                         max_id = i['id']
-                    self.on_event(i)
+                    with self._closing_lock:
+                        if self._closing.is_set():
+                            return
+                        self.on_event(i)
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
             self.id = max_id
             if not moreitems:
-                self.stop.wait(self.poll_time)
+                self._closing.wait(self.poll_time)
 
     def run_forever(self):
         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
-        while not self.stop.is_set():
-            self.stop.wait(1)
+        while not self._closing.is_set():
+            self._closing.wait(1)
+
+    def close(self, code=None, reason=None, timeout=0):
+        """Close poll client and optionally wait for it to finish.
+
+        If an :on_event: handler is running in a different thread,
+        first wait (indefinitely) for it to return.
+
+        After closing, wait up to :timeout: seconds for the thread to
+        finish the poll request in progress (if any).
 
-    def close(self):
-        """Close poll client and wait for it to finish."""
+        :code: and :reason: are ignored. They are present for
+        interface compatibility with EventClient.
+        """
 
-        self.stop.set()
+        with self._closing_lock:
+            self._closing.set()
         try:
-            self.join()
+            self.join(timeout=timeout)
         except RuntimeError:
             # "join() raises a RuntimeError if an attempt is made to join the
             # current thread as that would cause a deadlock. It is also an

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list