[ARVADOS] created: 1440352641349c15cc98fea5bf69e0a8b40d7a0c

git at public.curoverse.com git at public.curoverse.com
Wed Jan 29 18:06:59 EST 2014

        at  1440352641349c15cc98fea5bf69e0a8b40d7a0c (commit)

commit 1440352641349c15cc98fea5bf69e0a8b40d7a0c
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Jan 29 18:07:22 2014 -0500

    Rearranging modules to eliminate recursive imports.

diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index e57b7e6..18e762e 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -18,32 +18,13 @@ import fcntl
 import time
 import threading
-import apiclient
-import apiclient.discovery
-config = None
 EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
-services = {}
+from api import *
 from stream import *
 from collection import *
 from keep import *
-# Arvados configuration settings are taken from $HOME/.config/arvados.
-# Environment variables override settings in the config file.
-class ArvadosConfig(dict):
-    def __init__(self, config_file):
-        dict.__init__(self)
-        if os.path.exists(config_file):
-            with open(config_file, "r") as f:
-                for config_line in f:
-                    var, val = config_line.rstrip().split('=', 2)
-                    self[var] = val
-        for var in os.environ:
-            if var.startswith('ARVADOS_'):
-                self[var] = os.environ[var]
+import util
 class errors:
     class SyntaxError(Exception):
@@ -59,29 +40,6 @@ class errors:
     class NotImplementedError(Exception):
-class CredentialsFromEnv(object):
-    @staticmethod
-    def http_request(self, uri, **kwargs):
-        global config
-        from httplib import BadStatusLine
-        if 'headers' not in kwargs:
-            kwargs['headers'] = {}
-        kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
-        try:
-            return self.orig_http_request(uri, **kwargs)
-        except BadStatusLine:
-            # This is how httplib tells us that it tried to reuse an
-            # existing connection but it was already closed by the
-            # server. In that case, yes, we would like to retry.
-            # Unfortunately, we are not absolutely certain that the
-            # previous call did not succeed, so this is slightly
-            # risky.
-            return self.orig_http_request(uri, **kwargs)
-    def authorize(self, http):
-        http.orig_http_request = http.request
-        http.request = types.MethodType(self.http_request, http)
-        return http
 def task_set_output(self,s):
@@ -116,62 +74,6 @@ def current_job():
 def getjobparam(*args):
     return current_job()['script_parameters'].get(*args)
-# Monkey patch discovery._cast() so objects and arrays get serialized
-# with json.dumps() instead of str().
-_cast_orig = apiclient.discovery._cast
-def _cast_objects_too(value, schema_type):
-    global _cast_orig
-    if (type(value) != type('') and
-        (schema_type == 'object' or schema_type == 'array')):
-        return json.dumps(value)
-    else:
-        return _cast_orig(value, schema_type)
-apiclient.discovery._cast = _cast_objects_too
-def http_cache(data_type):
-    path = os.environ['HOME'] + '/.cache/arvados/' + data_type
-    try:
-        util.mkdir_dash_p(path)
-    except OSError:
-        path = None
-    return path
-def api(version=None):
-    global services, config
-    if not config:
-        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
-        if 'ARVADOS_DEBUG' in config:
-            logging.basicConfig(level=logging.DEBUG)
-    if not services.get(version):
-        apiVersion = version
-        if not version:
-            apiVersion = 'v1'
-            logging.info("Using default API version. " +
-                         "Call arvados.api('%s') instead." %
-                         apiVersion)
-        if 'ARVADOS_API_HOST' not in config:
-            raise Exception("ARVADOS_API_HOST is not set. Aborting.")
-        url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
-               config['ARVADOS_API_HOST'])
-        credentials = CredentialsFromEnv()
-        # Use system's CA certificates (if we find them) instead of httplib2's
-        ca_certs = '/etc/ssl/certs/ca-certificates.crt'
-        if not os.path.exists(ca_certs):
-            ca_certs = None             # use httplib2 default
-        http = httplib2.Http(ca_certs=ca_certs,
-                             cache=http_cache('discovery'))
-        http = credentials.authorize(http)
-        if re.match(r'(?i)^(true|1|yes)$',
-                    config.get('ARVADOS_API_HOST_INSECURE', 'no')):
-            http.disable_ssl_certificate_validation=True
-        services[version] = apiclient.discovery.build(
-            'arvados', apiVersion, http=http, discoveryServiceUrl=url)
-    return services[version]
 class JobTask(object):
     def __init__(self, parameters=dict(), runtime_constraints=dict()):
         print "init jobtask %s %s" % (parameters, runtime_constraints)
@@ -224,311 +126,4 @@ class job_setup:
-class util:
-    @staticmethod
-    def clear_tmpdir(path=None):
-        """
-        Ensure the given directory (or TASK_TMPDIR if none given)
-        exists and is empty.
-        """
-        if path == None:
-            path = current_task().tmpdir
-        if os.path.exists(path):
-            p = subprocess.Popen(['rm', '-rf', path])
-            stdout, stderr = p.communicate(None)
-            if p.returncode != 0:
-                raise Exception('rm -rf %s: %s' % (path, stderr))
-        os.mkdir(path)
-    @staticmethod
-    def run_command(execargs, **kwargs):
-        kwargs.setdefault('stdin', subprocess.PIPE)
-        kwargs.setdefault('stdout', subprocess.PIPE)
-        kwargs.setdefault('stderr', sys.stderr)
-        kwargs.setdefault('close_fds', True)
-        kwargs.setdefault('shell', False)
-        p = subprocess.Popen(execargs, **kwargs)
-        stdoutdata, stderrdata = p.communicate(None)
-        if p.returncode != 0:
-            raise errors.CommandFailedError(
-                "run_command %s exit %d:\n%s" %
-                (execargs, p.returncode, stderrdata))
-        return stdoutdata, stderrdata
-    @staticmethod
-    def git_checkout(url, version, path):
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        if not os.path.exists(path):
-            util.run_command(["git", "clone", url, path],
-                             cwd=os.path.dirname(path))
-        util.run_command(["git", "checkout", version],
-                         cwd=path)
-        return path
-    @staticmethod
-    def tar_extractor(path, decompress_flag):
-        return subprocess.Popen(["tar",
-                                 "-C", path,
-                                 ("-x%sf" % decompress_flag),
-                                 "-"],
-                                stdout=None,
-                                stdin=subprocess.PIPE, stderr=sys.stderr,
-                                shell=False, close_fds=True)
-    @staticmethod
-    def tarball_extract(tarball, path):
-        """Retrieve a tarball from Keep and extract it to a local
-        directory.  Return the absolute path where the tarball was
-        extracted. If the top level of the tarball contained just one
-        file or directory, return the absolute path of that single
-        item.
-        tarball -- collection locator
-        path -- where to extract the tarball: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == tarball:
-                already_have_it = True
-        except OSError:
-            pass
-        if not already_have_it:
-            # emulate "rm -f" (i.e., if the file does not exist, we win)
-            try:
-                os.unlink(os.path.join(path, '.locator'))
-            except OSError:
-                if os.path.exists(os.path.join(path, '.locator')):
-                    os.unlink(os.path.join(path, '.locator'))
-            for f in CollectionReader(tarball).all_files():
-                if re.search('\.(tbz|tar.bz2)$', f.name()):
-                    p = util.tar_extractor(path, 'j')
-                elif re.search('\.(tgz|tar.gz)$', f.name()):
-                    p = util.tar_extractor(path, 'z')
-                elif re.search('\.tar$', f.name()):
-                    p = util.tar_extractor(path, '')
-                else:
-                    raise errors.AssertionError(
-                        "tarball_extract cannot handle filename %s" % f.name())
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    p.stdin.write(buf)
-                p.stdin.close()
-                p.wait()
-                if p.returncode != 0:
-                    lockfile.close()
-                    raise errors.CommandFailedError(
-                        "tar exited %d" % p.returncode)
-            os.symlink(tarball, os.path.join(path, '.locator'))
-        tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
-        lockfile.close()
-        if len(tld_extracts) == 1:
-            return os.path.join(path, tld_extracts[0])
-        return path
-    @staticmethod
-    def zipball_extract(zipball, path):
-        """Retrieve a zip archive from Keep and extract it to a local
-        directory.  Return the absolute path where the archive was
-        extracted. If the top level of the archive contained just one
-        file or directory, return the absolute path of that single
-        item.
-        zipball -- collection locator
-        path -- where to extract the archive: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == zipball:
-                already_have_it = True
-        except OSError:
-            pass
-        if not already_have_it:
-            # emulate "rm -f" (i.e., if the file does not exist, we win)
-            try:
-                os.unlink(os.path.join(path, '.locator'))
-            except OSError:
-                if os.path.exists(os.path.join(path, '.locator')):
-                    os.unlink(os.path.join(path, '.locator'))
-            for f in CollectionReader(zipball).all_files():
-                if not re.search('\.zip$', f.name()):
-                    raise errors.NotImplementedError(
-                        "zipball_extract cannot handle filename %s" % f.name())
-                zip_filename = os.path.join(path, os.path.basename(f.name()))
-                zip_file = open(zip_filename, 'wb')
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    zip_file.write(buf)
-                zip_file.close()
-                p = subprocess.Popen(["unzip",
-                                      "-q", "-o",
-                                      "-d", path,
-                                      zip_filename],
-                                     stdout=None,
-                                     stdin=None, stderr=sys.stderr,
-                                     shell=False, close_fds=True)
-                p.wait()
-                if p.returncode != 0:
-                    lockfile.close()
-                    raise errors.CommandFailedError(
-                        "unzip exited %d" % p.returncode)
-                os.unlink(zip_filename)
-            os.symlink(zipball, os.path.join(path, '.locator'))
-        tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
-        lockfile.close()
-        if len(tld_extracts) == 1:
-            return os.path.join(path, tld_extracts[0])
-        return path
-    @staticmethod
-    def collection_extract(collection, path, files=[], decompress=True):
-        """Retrieve a collection from Keep and extract it to a local
-        directory.  Return the absolute path where the collection was
-        extracted.
-        collection -- collection locator
-        path -- where to extract: absolute, or relative to job tmp
-        """
-        matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
-        if matches:
-            collection_hash = matches.group(1)
-        else:
-            collection_hash = hashlib.md5(collection).hexdigest()
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == collection_hash:
-                already_have_it = True
-        except OSError:
-            pass
-        # emulate "rm -f" (i.e., if the file does not exist, we win)
-        try:
-            os.unlink(os.path.join(path, '.locator'))
-        except OSError:
-            if os.path.exists(os.path.join(path, '.locator')):
-                os.unlink(os.path.join(path, '.locator'))
-        files_got = []
-        for s in CollectionReader(collection).all_streams():
-            stream_name = s.name()
-            for f in s.all_files():
-                if (files == [] or
-                    ((f.name() not in files_got) and
-                     (f.name() in files or
-                      (decompress and f.decompressed_name() in files)))):
-                    outname = f.decompressed_name() if decompress else f.name()
-                    files_got += [outname]
-                    if os.path.exists(os.path.join(path, stream_name, outname)):
-                        continue
-                    util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
-                    outfile = open(os.path.join(path, stream_name, outname), 'wb')
-                    for buf in (f.readall_decompressed() if decompress
-                                else f.readall()):
-                        outfile.write(buf)
-                    outfile.close()
-        if len(files_got) < len(files):
-            raise errors.AssertionError(
-                "Wanted files %s but only got %s from %s" %
-                (files, files_got,
-                 [z.name() for z in CollectionReader(collection).all_files()]))
-        os.symlink(collection_hash, os.path.join(path, '.locator'))
-        lockfile.close()
-        return path
-    @staticmethod
-    def mkdir_dash_p(path):
-        if not os.path.exists(path):
-            util.mkdir_dash_p(os.path.dirname(path))
-            try:
-                os.mkdir(path)
-            except OSError:
-                if not os.path.exists(path):
-                    os.mkdir(path)
-    @staticmethod
-    def stream_extract(stream, path, files=[], decompress=True):
-        """Retrieve a stream from Keep and extract it to a local
-        directory.  Return the absolute path where the stream was
-        extracted.
-        stream -- StreamReader object
-        path -- where to extract: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        files_got = []
-        for f in stream.all_files():
-            if (files == [] or
-                ((f.name() not in files_got) and
-                 (f.name() in files or
-                  (decompress and f.decompressed_name() in files)))):
-                outname = f.decompressed_name() if decompress else f.name()
-                files_got += [outname]
-                if os.path.exists(os.path.join(path, outname)):
-                    os.unlink(os.path.join(path, outname))
-                util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
-                outfile = open(os.path.join(path, outname), 'wb')
-                for buf in (f.readall_decompressed() if decompress
-                            else f.readall()):
-                    outfile.write(buf)
-                outfile.close()
-        if len(files_got) < len(files):
-            raise errors.AssertionError(
-                "Wanted files %s but only got %s from %s" %
-                (files, files_got, [z.name() for z in stream.all_files()]))
-        lockfile.close()
-        return path
-    @staticmethod
-    def listdir_recursive(dirname, base=None):
-        allfiles = []
-        for ent in sorted(os.listdir(dirname)):
-            ent_path = os.path.join(dirname, ent)
-            ent_base = os.path.join(base, ent) if base else ent
-            if os.path.isdir(ent_path):
-                allfiles += util.listdir_recursive(ent_path, ent_base)
-            else:
-                allfiles += [ent_base]
-        return allfiles
diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
new file mode 100644
index 0000000..cc019a1
--- /dev/null
+++ b/sdk/python/arvados/api.py
@@ -0,0 +1,108 @@
+import httplib2
+import json
+import logging
+import os
+import re
+import types
+import apiclient
+import apiclient.discovery
+import util
+config = None
+services = {}
+class CredentialsFromEnv(object):
+    @staticmethod
+    def http_request(self, uri, **kwargs):
+        global config
+        from httplib import BadStatusLine
+        if 'headers' not in kwargs:
+            kwargs['headers'] = {}
+        kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
+        try:
+            return self.orig_http_request(uri, **kwargs)
+        except BadStatusLine:
+            # This is how httplib tells us that it tried to reuse an
+            # existing connection but it was already closed by the
+            # server. In that case, yes, we would like to retry.
+            # Unfortunately, we are not absolutely certain that the
+            # previous call did not succeed, so this is slightly
+            # risky.
+            return self.orig_http_request(uri, **kwargs)
+    def authorize(self, http):
+        http.orig_http_request = http.request
+        http.request = types.MethodType(self.http_request, http)
+        return http
+# Arvados configuration settings are taken from $HOME/.config/arvados.
+# Environment variables override settings in the config file.
+class ArvadosConfig(dict):
+    def __init__(self, config_file):
+        dict.__init__(self)
+        if os.path.exists(config_file):
+            with open(config_file, "r") as f:
+                for config_line in f:
+                    var, val = config_line.rstrip().split('=', 2)
+                    self[var] = val
+        for var in os.environ:
+            if var.startswith('ARVADOS_'):
+                self[var] = os.environ[var]
+# Monkey patch discovery._cast() so objects and arrays get serialized
+# with json.dumps() instead of str().
+_cast_orig = apiclient.discovery._cast
+def _cast_objects_too(value, schema_type):
+    global _cast_orig
+    if (type(value) != type('') and
+        (schema_type == 'object' or schema_type == 'array')):
+        return json.dumps(value)
+    else:
+        return _cast_orig(value, schema_type)
+apiclient.discovery._cast = _cast_objects_too
+def http_cache(data_type):
+    path = os.environ['HOME'] + '/.cache/arvados/' + data_type
+    try:
+        util.mkdir_dash_p(path)
+    except OSError:
+        path = None
+    return path
+def api(version=None):
+    global services, config
+    if not config:
+        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
+        if 'ARVADOS_DEBUG' in config:
+            logging.basicConfig(level=logging.DEBUG)
+    if not services.get(version):
+        apiVersion = version
+        if not version:
+            apiVersion = 'v1'
+            logging.info("Using default API version. " +
+                         "Call arvados.api('%s') instead." %
+                         apiVersion)
+        if 'ARVADOS_API_HOST' not in config:
+            raise Exception("ARVADOS_API_HOST is not set. Aborting.")
+        url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
+               config['ARVADOS_API_HOST'])
+        credentials = CredentialsFromEnv()
+        # Use system's CA certificates (if we find them) instead of httplib2's
+        ca_certs = '/etc/ssl/certs/ca-certificates.crt'
+        if not os.path.exists(ca_certs):
+            ca_certs = None             # use httplib2 default
+        http = httplib2.Http(ca_certs=ca_certs,
+                             cache=http_cache('discovery'))
+        http = credentials.authorize(http)
+        if re.match(r'(?i)^(true|1|yes)$',
+                    config.get('ARVADOS_API_HOST_INSECURE', 'no')):
+            http.disable_ssl_certificate_validation=True
+        services[version] = apiclient.discovery.build(
+            'arvados', apiVersion, http=http, discoveryServiceUrl=url)
+    return services[version]
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index dc2f0f8..296f950 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -20,6 +20,7 @@ import threading
 from stream import *
 from keep import *
+import util
 class CollectionReader(object):
     def __init__(self, manifest_locator_or_text):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 62e9d08..6c24704 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -20,7 +20,8 @@ import threading
 global_client_object = None
-from arvados import *
+from api import *
+import util
 class Keep:
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
new file mode 100644
index 0000000..9286795
--- /dev/null
+++ b/sdk/python/arvados/util.py
@@ -0,0 +1,302 @@
+import fcntl
+import hashlib
+import os
+import re
+import subprocess
+def clear_tmpdir(path=None):
+    """
+    Ensure the given directory (or TASK_TMPDIR if none given)
+    exists and is empty.
+    """
+    if path == None:
+        path = current_task().tmpdir
+    if os.path.exists(path):
+        p = subprocess.Popen(['rm', '-rf', path])
+        stdout, stderr = p.communicate(None)
+        if p.returncode != 0:
+            raise Exception('rm -rf %s: %s' % (path, stderr))
+    os.mkdir(path)
+def run_command(execargs, **kwargs):
+    kwargs.setdefault('stdin', subprocess.PIPE)
+    kwargs.setdefault('stdout', subprocess.PIPE)
+    kwargs.setdefault('stderr', sys.stderr)
+    kwargs.setdefault('close_fds', True)
+    kwargs.setdefault('shell', False)
+    p = subprocess.Popen(execargs, **kwargs)
+    stdoutdata, stderrdata = p.communicate(None)
+    if p.returncode != 0:
+        raise errors.CommandFailedError(
+            "run_command %s exit %d:\n%s" %
+            (execargs, p.returncode, stderrdata))
+    return stdoutdata, stderrdata
+def git_checkout(url, version, path):
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    if not os.path.exists(path):
+        util.run_command(["git", "clone", url, path],
+                         cwd=os.path.dirname(path))
+    util.run_command(["git", "checkout", version],
+                     cwd=path)
+    return path
+def tar_extractor(path, decompress_flag):
+    return subprocess.Popen(["tar",
+                             "-C", path,
+                             ("-x%sf" % decompress_flag),
+                             "-"],
+                            stdout=None,
+                            stdin=subprocess.PIPE, stderr=sys.stderr,
+                            shell=False, close_fds=True)
+def tarball_extract(tarball, path):
+    """Retrieve a tarball from Keep and extract it to a local
+    directory.  Return the absolute path where the tarball was
+    extracted. If the top level of the tarball contained just one
+    file or directory, return the absolute path of that single
+    item.
+    tarball -- collection locator
+    path -- where to extract the tarball: absolute, or relative to job tmp
+    """
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    already_have_it = False
+    try:
+        if os.readlink(os.path.join(path, '.locator')) == tarball:
+            already_have_it = True
+    except OSError:
+        pass
+    if not already_have_it:
+        # emulate "rm -f" (i.e., if the file does not exist, we win)
+        try:
+            os.unlink(os.path.join(path, '.locator'))
+        except OSError:
+            if os.path.exists(os.path.join(path, '.locator')):
+                os.unlink(os.path.join(path, '.locator'))
+        for f in CollectionReader(tarball).all_files():
+            if re.search('\.(tbz|tar.bz2)$', f.name()):
+                p = util.tar_extractor(path, 'j')
+            elif re.search('\.(tgz|tar.gz)$', f.name()):
+                p = util.tar_extractor(path, 'z')
+            elif re.search('\.tar$', f.name()):
+                p = util.tar_extractor(path, '')
+            else:
+                raise errors.AssertionError(
+                    "tarball_extract cannot handle filename %s" % f.name())
+            while True:
+                buf = f.read(2**20)
+                if len(buf) == 0:
+                    break
+                p.stdin.write(buf)
+            p.stdin.close()
+            p.wait()
+            if p.returncode != 0:
+                lockfile.close()
+                raise errors.CommandFailedError(
+                    "tar exited %d" % p.returncode)
+        os.symlink(tarball, os.path.join(path, '.locator'))
+    tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
+    lockfile.close()
+    if len(tld_extracts) == 1:
+        return os.path.join(path, tld_extracts[0])
+    return path
+def zipball_extract(zipball, path):
+    """Retrieve a zip archive from Keep and extract it to a local
+    directory.  Return the absolute path where the archive was
+    extracted. If the top level of the archive contained just one
+    file or directory, return the absolute path of that single
+    item.
+    zipball -- collection locator
+    path -- where to extract the archive: absolute, or relative to job tmp
+    """
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    already_have_it = False
+    try:
+        if os.readlink(os.path.join(path, '.locator')) == zipball:
+            already_have_it = True
+    except OSError:
+        pass
+    if not already_have_it:
+        # emulate "rm -f" (i.e., if the file does not exist, we win)
+        try:
+            os.unlink(os.path.join(path, '.locator'))
+        except OSError:
+            if os.path.exists(os.path.join(path, '.locator')):
+                os.unlink(os.path.join(path, '.locator'))
+        for f in CollectionReader(zipball).all_files():
+            if not re.search('\.zip$', f.name()):
+                raise errors.NotImplementedError(
+                    "zipball_extract cannot handle filename %s" % f.name())
+            zip_filename = os.path.join(path, os.path.basename(f.name()))
+            zip_file = open(zip_filename, 'wb')
+            while True:
+                buf = f.read(2**20)
+                if len(buf) == 0:
+                    break
+                zip_file.write(buf)
+            zip_file.close()
+            p = subprocess.Popen(["unzip",
+                                  "-q", "-o",
+                                  "-d", path,
+                                  zip_filename],
+                                 stdout=None,
+                                 stdin=None, stderr=sys.stderr,
+                                 shell=False, close_fds=True)
+            p.wait()
+            if p.returncode != 0:
+                lockfile.close()
+                raise errors.CommandFailedError(
+                    "unzip exited %d" % p.returncode)
+            os.unlink(zip_filename)
+        os.symlink(zipball, os.path.join(path, '.locator'))
+    tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
+    lockfile.close()
+    if len(tld_extracts) == 1:
+        return os.path.join(path, tld_extracts[0])
+    return path
+def collection_extract(collection, path, files=[], decompress=True):
+    """Retrieve a collection from Keep and extract it to a local
+    directory.  Return the absolute path where the collection was
+    extracted.
+    collection -- collection locator
+    path -- where to extract: absolute, or relative to job tmp
+    """
+    matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
+    if matches:
+        collection_hash = matches.group(1)
+    else:
+        collection_hash = hashlib.md5(collection).hexdigest()
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    already_have_it = False
+    try:
+        if os.readlink(os.path.join(path, '.locator')) == collection_hash:
+            already_have_it = True
+    except OSError:
+        pass
+    # emulate "rm -f" (i.e., if the file does not exist, we win)
+    try:
+        os.unlink(os.path.join(path, '.locator'))
+    except OSError:
+        if os.path.exists(os.path.join(path, '.locator')):
+            os.unlink(os.path.join(path, '.locator'))
+    files_got = []
+    for s in CollectionReader(collection).all_streams():
+        stream_name = s.name()
+        for f in s.all_files():
+            if (files == [] or
+                ((f.name() not in files_got) and
+                 (f.name() in files or
+                  (decompress and f.decompressed_name() in files)))):
+                outname = f.decompressed_name() if decompress else f.name()
+                files_got += [outname]
+                if os.path.exists(os.path.join(path, stream_name, outname)):
+                    continue
+                util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
+                outfile = open(os.path.join(path, stream_name, outname), 'wb')
+                for buf in (f.readall_decompressed() if decompress
+                            else f.readall()):
+                    outfile.write(buf)
+                outfile.close()
+    if len(files_got) < len(files):
+        raise errors.AssertionError(
+            "Wanted files %s but only got %s from %s" %
+            (files, files_got,
+             [z.name() for z in CollectionReader(collection).all_files()]))
+    os.symlink(collection_hash, os.path.join(path, '.locator'))
+    lockfile.close()
+    return path
+def mkdir_dash_p(path):
+    if not os.path.exists(path):
+        util.mkdir_dash_p(os.path.dirname(path))
+        try:
+            os.mkdir(path)
+        except OSError:
+            if not os.path.exists(path):
+                os.mkdir(path)
+def stream_extract(stream, path, files=[], decompress=True):
+    """Retrieve a stream from Keep and extract it to a local
+    directory.  Return the absolute path where the stream was
+    extracted.
+    stream -- StreamReader object
+    path -- where to extract: absolute, or relative to job tmp
+    """
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    files_got = []
+    for f in stream.all_files():
+        if (files == [] or
+            ((f.name() not in files_got) and
+             (f.name() in files or
+              (decompress and f.decompressed_name() in files)))):
+            outname = f.decompressed_name() if decompress else f.name()
+            files_got += [outname]
+            if os.path.exists(os.path.join(path, outname)):
+                os.unlink(os.path.join(path, outname))
+            util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
+            outfile = open(os.path.join(path, outname), 'wb')
+            for buf in (f.readall_decompressed() if decompress
+                        else f.readall()):
+                outfile.write(buf)
+            outfile.close()
+    if len(files_got) < len(files):
+        raise errors.AssertionError(
+            "Wanted files %s but only got %s from %s" %
+            (files, files_got, [z.name() for z in stream.all_files()]))
+    lockfile.close()
+    return path
+def listdir_recursive(dirname, base=None):
+    allfiles = []
+    for ent in sorted(os.listdir(dirname)):
+        ent_path = os.path.join(dirname, ent)
+        ent_base = os.path.join(base, ent) if base else ent
+        if os.path.isdir(ent_path):
+            allfiles += util.listdir_recursive(ent_path, ent_base)
+        else:
+            allfiles += [ent_base]
+    return allfiles



