[ARVADOS] updated: 013efe3cfb6e53b372be1f785646e76a7965a948

git at public.curoverse.com git at public.curoverse.com
Thu Jan 30 15:41:38 EST 2014


Summary of changes:
 sdk/python/arvados/__init__.py                     |   19 +-------
 sdk/python/arvados/api.py                          |   31 +++----------
 sdk/python/arvados/collection.py                   |    7 ++-
 sdk/python/arvados/config.py                       |   34 +++++++++++++++
 sdk/python/arvados/errors.py                       |   14 ++++++
 sdk/python/arvados/keep.py                         |   11 ++---
 sdk/python/arvados/stream.py                       |    4 +-
 services/api/app/models/job.rb                     |   16 ++++---
 .../20140117231056_normalize_collection_uuid.rb    |    4 +-
 ...malize_collection_uuids_in_script_parameters.rb |   45 ++++++++++++++++++++
 services/api/db/schema.rb                          |    2 +-
 services/api/test/fixtures/jobs.yml                |   22 ++++++++++
 .../functional/arvados/v1/jobs_controller_test.rb  |   17 +++++++
 13 files changed, 165 insertions(+), 61 deletions(-)
 create mode 100644 sdk/python/arvados/config.py
 create mode 100644 sdk/python/arvados/errors.py
 create mode 100644 services/api/db/migrate/20140129184311_normalize_collection_uuids_in_script_parameters.rb

       via  013efe3cfb6e53b372be1f785646e76a7965a948 (commit)
       via  9ea1a87fc8e9725473e909f543af2b83b03237a4 (commit)
       via  3a342f5de4da4b155d551e54aeeab51eed9f8d68 (commit)
       via  6ae89f63b495395880b34645c84c813cbfacee78 (commit)
       via  c67624630594b481aa96d548282187720601abea (commit)
       via  53f098a4343081d2f31be9b84c11973320532337 (commit)
       via  2728f59746f7a57ed0283b899f9636cfef77d001 (commit)
       via  566bb36cbadbe0e4ceb1a30123033d9310a0f4d8 (commit)
      from  1440352641349c15cc98fea5bf69e0a8b40d7a0c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 013efe3cfb6e53b372be1f785646e76a7965a948
Merge: 9ea1a87 1440352
Author: Tim Pierce <twp at curoverse.com>
Date:   Thu Jan 30 15:42:13 2014 -0500

    Merge branch '2036-fix-python-sdk' of git.clinicalfuture.com:arvados into 2036-fix-python-sdk
    
    Conflicts:
    	sdk/python/arvados/__init__.py
    	sdk/python/arvados/api.py
    	sdk/python/arvados/collection.py
    	sdk/python/arvados/keep.py

diff --cc sdk/python/arvados/collection.py
index 6b2a905,296f950..8e39318
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@@ -18,11 -18,9 +18,10 @@@ import fcnt
  import time
  import threading
  
 -from stream import *
  from keep import *
 -import util
 +from stream import *
 +import config
 +import errors
- import util
  
  class CollectionReader(object):
      def __init__(self, manifest_locator_or_text):

commit 9ea1a87fc8e9725473e909f543af2b83b03237a4
Author: Tim Pierce <twp at curoverse.com>
Date:   Thu Jan 30 15:37:39 2014 -0500

    Resolving scoping problems. Python unit tests now pass. (fixes #2036)

diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index 18e762e..b165412 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -18,28 +18,13 @@ import fcntl
 import time
 import threading
 
-EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
-
 from api import *
-from stream import *
 from collection import *
 from keep import *
+from stream import *
+import errors
 import util
 
-class errors:
-    class SyntaxError(Exception):
-        pass
-    class AssertionError(Exception):
-        pass
-    class NotFoundError(Exception):
-        pass
-    class CommandFailedError(Exception):
-        pass
-    class KeepWriteError(Exception):
-        pass
-    class NotImplementedError(Exception):
-        pass
-
 def task_set_output(self,s):
     api('v1').job_tasks().update(uuid=self['uuid'],
                                  body={
diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index cc019a1..4413167 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -7,15 +7,15 @@ import types
 
 import apiclient
 import apiclient.discovery
+import config
+import errors
 import util
 
-config = None
 services = {}
 
 class CredentialsFromEnv(object):
     @staticmethod
     def http_request(self, uri, **kwargs):
-        global config
         from httplib import BadStatusLine
         if 'headers' not in kwargs:
             kwargs['headers'] = {}
@@ -35,21 +35,6 @@ class CredentialsFromEnv(object):
         http.request = types.MethodType(self.http_request, http)
         return http
 
-# Arvados configuration settings are taken from $HOME/.config/arvados.
-# Environment variables override settings in the config file.
-#
-class ArvadosConfig(dict):
-    def __init__(self, config_file):
-        dict.__init__(self)
-        if os.path.exists(config_file):
-            with open(config_file, "r") as f:
-                for config_line in f:
-                    var, val = config_line.rstrip().split('=', 2)
-                    self[var] = val
-        for var in os.environ:
-            if var.startswith('ARVADOS_'):
-                self[var] = os.environ[var]
-
 # Monkey patch discovery._cast() so objects and arrays get serialized
 # with json.dumps() instead of str().
 _cast_orig = apiclient.discovery._cast
@@ -71,12 +56,10 @@ def http_cache(data_type):
     return path
 
 def api(version=None):
-    global services, config
+    global services
 
-    if not config:
-        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
-        if 'ARVADOS_DEBUG' in config:
-            logging.basicConfig(level=logging.DEBUG)
+    if 'ARVADOS_DEBUG' in config.settings():
+        logging.basicConfig(level=logging.DEBUG)
 
     if not services.get(version):
         apiVersion = version
@@ -85,10 +68,10 @@ def api(version=None):
             logging.info("Using default API version. " +
                          "Call arvados.api('%s') instead." %
                          apiVersion)
-        if 'ARVADOS_API_HOST' not in config:
+        if 'ARVADOS_API_HOST' not in config.settings():
             raise Exception("ARVADOS_API_HOST is not set. Aborting.")
         url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
-               config['ARVADOS_API_HOST'])
+               config.get('ARVADOS_API_HOST'))
         credentials = CredentialsFromEnv()
 
         # Use system's CA certificates (if we find them) instead of httplib2's
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 296f950..6b2a905 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -18,8 +18,10 @@ import fcntl
 import time
 import threading
 
-from stream import *
 from keep import *
+from stream import *
+import config
+import errors
 import util
 
 class CollectionReader(object):
@@ -181,7 +183,7 @@ class CollectionWriter(object):
                 (self._current_stream_length, len(self._current_stream_files)))
         else:
             if len(self._current_stream_locators) == 0:
-                self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
+                self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
             self._finished_streams += [[self._current_stream_name,
                                        self._current_stream_locators,
                                        self._current_stream_files]]
diff --git a/sdk/python/arvados/config.py b/sdk/python/arvados/config.py
new file mode 100644
index 0000000..e205e92
--- /dev/null
+++ b/sdk/python/arvados/config.py
@@ -0,0 +1,34 @@
+# config.py - configuration settings and global variables for Arvados clients
+#
+# Arvados configuration settings are taken from $HOME/.config/arvados.
+# Environment variables override settings in the config file.
+
+import os
+import re
+
+_settings = None
+default_config_file = os.environ['HOME'] + '/.config/arvados/settings.conf'
+
+EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+
+def initialize(config_file=default_config_file):
+    global _settings
+    _settings = {}
+    if os.path.exists(config_file):
+        with open(config_file, "r") as f:
+            for config_line in f:
+                if re.match('^\s*#', config_line):
+                    continue
+                var, val = config_line.rstrip().split('=', 2)
+                _settings[var] = val
+    for var in os.environ:
+        if var.startswith('ARVADOS_'):
+            _settings[var] = os.environ[var]
+
+def get(key, default_val=None):
+    return settings().get(key, default_val)
+
+def settings():
+    if _settings is None:
+        initialize()
+    return _settings
diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py
new file mode 100644
index 0000000..5ea54be
--- /dev/null
+++ b/sdk/python/arvados/errors.py
@@ -0,0 +1,14 @@
+# errors.py - Arvados-specific exceptions.
+
+class SyntaxError(Exception):
+    pass
+class AssertionError(Exception):
+    pass
+class NotFoundError(Exception):
+    pass
+class CommandFailedError(Exception):
+    pass
+class KeepWriteError(Exception):
+    pass
+class NotImplementedError(Exception):
+    pass
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 6c24704..c9f83bf 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -21,7 +21,8 @@ import threading
 global_client_object = None
 
 from api import *
-import util
+import config
+import errors
 
 class Keep:
     @staticmethod
@@ -96,7 +97,6 @@ class KeepClient(object):
             self.args = kwargs
 
         def run(self):
-            global config
             with self.args['thread_limiter'] as limiter:
                 if not limiter.shall_i_proceed():
                     # My turn arrived, but the job has been done without
@@ -108,7 +108,7 @@ class KeepClient(object):
                                self.args['service_root']))
                 h = httplib2.Http()
                 url = self.args['service_root'] + self.args['data_hash']
-                api_token = config['ARVADOS_API_TOKEN']
+                api_token = config.get('ARVADOS_API_TOKEN')
                 headers = {'Authorization': "OAuth2 %s" % api_token}
                 try:
                     resp, content = h.request(url.encode('utf-8'), 'PUT',
@@ -168,7 +168,6 @@ class KeepClient(object):
         return pseq
 
     def get(self, locator):
-        global config
         if re.search(r',', locator):
             return ''.join(self.get(x) for x in locator.split(','))
         if 'KEEP_LOCAL_STORE' in os.environ:
@@ -177,7 +176,7 @@ class KeepClient(object):
         for service_root in self.shuffled_service_roots(expect_hash):
             h = httplib2.Http()
             url = service_root + expect_hash
-            api_token = config['ARVADOS_API_TOKEN']
+            api_token = config.get('ARVADOS_API_TOKEN')
             headers = {'Authorization': "OAuth2 %s" % api_token,
                        'Accept': 'application/octet-stream'}
             try:
@@ -246,7 +245,7 @@ class KeepClient(object):
         if not r:
             raise errors.NotFoundError(
                 "Invalid data locator: '%s'" % locator)
-        if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
+        if r.group(0) == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return ''
         with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
             return f.read()
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 8570b97..d61de4d 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -19,6 +19,8 @@ import time
 import threading
 
 from keep import *
+import config
+import errors
 
 class StreamFileReader(object):
     def __init__(self, stream, pos, size, name):
@@ -99,7 +101,7 @@ class StreamFileReader(object):
     def as_manifest(self):
         if self.size() == 0:
             return ("%s %s 0:0:%s\n"
-                    % (self._stream.name(), EMPTY_BLOCK_LOCATOR, self.name()))
+                    % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name()))
         return string.join(self._stream.tokens_for_range(self._pos, self._size),
                            " ") + "\n"
 

commit 3a342f5de4da4b155d551e54aeeab51eed9f8d68
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Jan 29 18:07:22 2014 -0500

    Rearranging modules to eliminate recursive imports.

diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index e57b7e6..18e762e 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -18,32 +18,13 @@ import fcntl
 import time
 import threading
 
-import apiclient
-import apiclient.discovery
-
-config = None
 EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
-services = {}
 
+from api import *
 from stream import *
 from collection import *
 from keep import *
-
-
-# Arvados configuration settings are taken from $HOME/.config/arvados.
-# Environment variables override settings in the config file.
-#
-class ArvadosConfig(dict):
-    def __init__(self, config_file):
-        dict.__init__(self)
-        if os.path.exists(config_file):
-            with open(config_file, "r") as f:
-                for config_line in f:
-                    var, val = config_line.rstrip().split('=', 2)
-                    self[var] = val
-        for var in os.environ:
-            if var.startswith('ARVADOS_'):
-                self[var] = os.environ[var]
+import util
 
 class errors:
     class SyntaxError(Exception):
@@ -59,29 +40,6 @@ class errors:
     class NotImplementedError(Exception):
         pass
 
-class CredentialsFromEnv(object):
-    @staticmethod
-    def http_request(self, uri, **kwargs):
-        global config
-        from httplib import BadStatusLine
-        if 'headers' not in kwargs:
-            kwargs['headers'] = {}
-        kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
-        try:
-            return self.orig_http_request(uri, **kwargs)
-        except BadStatusLine:
-            # This is how httplib tells us that it tried to reuse an
-            # existing connection but it was already closed by the
-            # server. In that case, yes, we would like to retry.
-            # Unfortunately, we are not absolutely certain that the
-            # previous call did not succeed, so this is slightly
-            # risky.
-            return self.orig_http_request(uri, **kwargs)
-    def authorize(self, http):
-        http.orig_http_request = http.request
-        http.request = types.MethodType(self.http_request, http)
-        return http
-
 def task_set_output(self,s):
     api('v1').job_tasks().update(uuid=self['uuid'],
                                  body={
@@ -116,62 +74,6 @@ def current_job():
 def getjobparam(*args):
     return current_job()['script_parameters'].get(*args)
 
-# Monkey patch discovery._cast() so objects and arrays get serialized
-# with json.dumps() instead of str().
-_cast_orig = apiclient.discovery._cast
-def _cast_objects_too(value, schema_type):
-    global _cast_orig
-    if (type(value) != type('') and
-        (schema_type == 'object' or schema_type == 'array')):
-        return json.dumps(value)
-    else:
-        return _cast_orig(value, schema_type)
-apiclient.discovery._cast = _cast_objects_too
-
-def http_cache(data_type):
-    path = os.environ['HOME'] + '/.cache/arvados/' + data_type
-    try:
-        util.mkdir_dash_p(path)
-    except OSError:
-        path = None
-    return path
-
-def api(version=None):
-    global services, config
-
-    if not config:
-        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
-        if 'ARVADOS_DEBUG' in config:
-            logging.basicConfig(level=logging.DEBUG)
-
-    if not services.get(version):
-        apiVersion = version
-        if not version:
-            apiVersion = 'v1'
-            logging.info("Using default API version. " +
-                         "Call arvados.api('%s') instead." %
-                         apiVersion)
-        if 'ARVADOS_API_HOST' not in config:
-            raise Exception("ARVADOS_API_HOST is not set. Aborting.")
-        url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
-               config['ARVADOS_API_HOST'])
-        credentials = CredentialsFromEnv()
-
-        # Use system's CA certificates (if we find them) instead of httplib2's
-        ca_certs = '/etc/ssl/certs/ca-certificates.crt'
-        if not os.path.exists(ca_certs):
-            ca_certs = None             # use httplib2 default
-
-        http = httplib2.Http(ca_certs=ca_certs,
-                             cache=http_cache('discovery'))
-        http = credentials.authorize(http)
-        if re.match(r'(?i)^(true|1|yes)$',
-                    config.get('ARVADOS_API_HOST_INSECURE', 'no')):
-            http.disable_ssl_certificate_validation=True
-        services[version] = apiclient.discovery.build(
-            'arvados', apiVersion, http=http, discoveryServiceUrl=url)
-    return services[version]
-
 class JobTask(object):
     def __init__(self, parameters=dict(), runtime_constraints=dict()):
         print "init jobtask %s %s" % (parameters, runtime_constraints)
@@ -224,311 +126,4 @@ class job_setup:
                                        ).execute()
             exit(0)
 
-class util:
-    @staticmethod
-    def clear_tmpdir(path=None):
-        """
-        Ensure the given directory (or TASK_TMPDIR if none given)
-        exists and is empty.
-        """
-        if path == None:
-            path = current_task().tmpdir
-        if os.path.exists(path):
-            p = subprocess.Popen(['rm', '-rf', path])
-            stdout, stderr = p.communicate(None)
-            if p.returncode != 0:
-                raise Exception('rm -rf %s: %s' % (path, stderr))
-        os.mkdir(path)
-
-    @staticmethod
-    def run_command(execargs, **kwargs):
-        kwargs.setdefault('stdin', subprocess.PIPE)
-        kwargs.setdefault('stdout', subprocess.PIPE)
-        kwargs.setdefault('stderr', sys.stderr)
-        kwargs.setdefault('close_fds', True)
-        kwargs.setdefault('shell', False)
-        p = subprocess.Popen(execargs, **kwargs)
-        stdoutdata, stderrdata = p.communicate(None)
-        if p.returncode != 0:
-            raise errors.CommandFailedError(
-                "run_command %s exit %d:\n%s" %
-                (execargs, p.returncode, stderrdata))
-        return stdoutdata, stderrdata
-
-    @staticmethod
-    def git_checkout(url, version, path):
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        if not os.path.exists(path):
-            util.run_command(["git", "clone", url, path],
-                             cwd=os.path.dirname(path))
-        util.run_command(["git", "checkout", version],
-                         cwd=path)
-        return path
-
-    @staticmethod
-    def tar_extractor(path, decompress_flag):
-        return subprocess.Popen(["tar",
-                                 "-C", path,
-                                 ("-x%sf" % decompress_flag),
-                                 "-"],
-                                stdout=None,
-                                stdin=subprocess.PIPE, stderr=sys.stderr,
-                                shell=False, close_fds=True)
-
-    @staticmethod
-    def tarball_extract(tarball, path):
-        """Retrieve a tarball from Keep and extract it to a local
-        directory.  Return the absolute path where the tarball was
-        extracted. If the top level of the tarball contained just one
-        file or directory, return the absolute path of that single
-        item.
-
-        tarball -- collection locator
-        path -- where to extract the tarball: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == tarball:
-                already_have_it = True
-        except OSError:
-            pass
-        if not already_have_it:
-
-            # emulate "rm -f" (i.e., if the file does not exist, we win)
-            try:
-                os.unlink(os.path.join(path, '.locator'))
-            except OSError:
-                if os.path.exists(os.path.join(path, '.locator')):
-                    os.unlink(os.path.join(path, '.locator'))
-
-            for f in CollectionReader(tarball).all_files():
-                if re.search('\.(tbz|tar.bz2)$', f.name()):
-                    p = util.tar_extractor(path, 'j')
-                elif re.search('\.(tgz|tar.gz)$', f.name()):
-                    p = util.tar_extractor(path, 'z')
-                elif re.search('\.tar$', f.name()):
-                    p = util.tar_extractor(path, '')
-                else:
-                    raise errors.AssertionError(
-                        "tarball_extract cannot handle filename %s" % f.name())
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    p.stdin.write(buf)
-                p.stdin.close()
-                p.wait()
-                if p.returncode != 0:
-                    lockfile.close()
-                    raise errors.CommandFailedError(
-                        "tar exited %d" % p.returncode)
-            os.symlink(tarball, os.path.join(path, '.locator'))
-        tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
-        lockfile.close()
-        if len(tld_extracts) == 1:
-            return os.path.join(path, tld_extracts[0])
-        return path
-
-    @staticmethod
-    def zipball_extract(zipball, path):
-        """Retrieve a zip archive from Keep and extract it to a local
-        directory.  Return the absolute path where the archive was
-        extracted. If the top level of the archive contained just one
-        file or directory, return the absolute path of that single
-        item.
-
-        zipball -- collection locator
-        path -- where to extract the archive: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == zipball:
-                already_have_it = True
-        except OSError:
-            pass
-        if not already_have_it:
-
-            # emulate "rm -f" (i.e., if the file does not exist, we win)
-            try:
-                os.unlink(os.path.join(path, '.locator'))
-            except OSError:
-                if os.path.exists(os.path.join(path, '.locator')):
-                    os.unlink(os.path.join(path, '.locator'))
-
-            for f in CollectionReader(zipball).all_files():
-                if not re.search('\.zip$', f.name()):
-                    raise errors.NotImplementedError(
-                        "zipball_extract cannot handle filename %s" % f.name())
-                zip_filename = os.path.join(path, os.path.basename(f.name()))
-                zip_file = open(zip_filename, 'wb')
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    zip_file.write(buf)
-                zip_file.close()
-                
-                p = subprocess.Popen(["unzip",
-                                      "-q", "-o",
-                                      "-d", path,
-                                      zip_filename],
-                                     stdout=None,
-                                     stdin=None, stderr=sys.stderr,
-                                     shell=False, close_fds=True)
-                p.wait()
-                if p.returncode != 0:
-                    lockfile.close()
-                    raise errors.CommandFailedError(
-                        "unzip exited %d" % p.returncode)
-                os.unlink(zip_filename)
-            os.symlink(zipball, os.path.join(path, '.locator'))
-        tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
-        lockfile.close()
-        if len(tld_extracts) == 1:
-            return os.path.join(path, tld_extracts[0])
-        return path
-
-    @staticmethod
-    def collection_extract(collection, path, files=[], decompress=True):
-        """Retrieve a collection from Keep and extract it to a local
-        directory.  Return the absolute path where the collection was
-        extracted.
-
-        collection -- collection locator
-        path -- where to extract: absolute, or relative to job tmp
-        """
-        matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
-        if matches:
-            collection_hash = matches.group(1)
-        else:
-            collection_hash = hashlib.md5(collection).hexdigest()
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == collection_hash:
-                already_have_it = True
-        except OSError:
-            pass
-
-        # emulate "rm -f" (i.e., if the file does not exist, we win)
-        try:
-            os.unlink(os.path.join(path, '.locator'))
-        except OSError:
-            if os.path.exists(os.path.join(path, '.locator')):
-                os.unlink(os.path.join(path, '.locator'))
-
-        files_got = []
-        for s in CollectionReader(collection).all_streams():
-            stream_name = s.name()
-            for f in s.all_files():
-                if (files == [] or
-                    ((f.name() not in files_got) and
-                     (f.name() in files or
-                      (decompress and f.decompressed_name() in files)))):
-                    outname = f.decompressed_name() if decompress else f.name()
-                    files_got += [outname]
-                    if os.path.exists(os.path.join(path, stream_name, outname)):
-                        continue
-                    util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
-                    outfile = open(os.path.join(path, stream_name, outname), 'wb')
-                    for buf in (f.readall_decompressed() if decompress
-                                else f.readall()):
-                        outfile.write(buf)
-                    outfile.close()
-        if len(files_got) < len(files):
-            raise errors.AssertionError(
-                "Wanted files %s but only got %s from %s" %
-                (files, files_got,
-                 [z.name() for z in CollectionReader(collection).all_files()]))
-        os.symlink(collection_hash, os.path.join(path, '.locator'))
-
-        lockfile.close()
-        return path
-
-    @staticmethod
-    def mkdir_dash_p(path):
-        if not os.path.exists(path):
-            util.mkdir_dash_p(os.path.dirname(path))
-            try:
-                os.mkdir(path)
-            except OSError:
-                if not os.path.exists(path):
-                    os.mkdir(path)
-
-    @staticmethod
-    def stream_extract(stream, path, files=[], decompress=True):
-        """Retrieve a stream from Keep and extract it to a local
-        directory.  Return the absolute path where the stream was
-        extracted.
-
-        stream -- StreamReader object
-        path -- where to extract: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-
-        files_got = []
-        for f in stream.all_files():
-            if (files == [] or
-                ((f.name() not in files_got) and
-                 (f.name() in files or
-                  (decompress and f.decompressed_name() in files)))):
-                outname = f.decompressed_name() if decompress else f.name()
-                files_got += [outname]
-                if os.path.exists(os.path.join(path, outname)):
-                    os.unlink(os.path.join(path, outname))
-                util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
-                outfile = open(os.path.join(path, outname), 'wb')
-                for buf in (f.readall_decompressed() if decompress
-                            else f.readall()):
-                    outfile.write(buf)
-                outfile.close()
-        if len(files_got) < len(files):
-            raise errors.AssertionError(
-                "Wanted files %s but only got %s from %s" %
-                (files, files_got, [z.name() for z in stream.all_files()]))
-        lockfile.close()
-        return path
-
-    @staticmethod
-    def listdir_recursive(dirname, base=None):
-        allfiles = []
-        for ent in sorted(os.listdir(dirname)):
-            ent_path = os.path.join(dirname, ent)
-            ent_base = os.path.join(base, ent) if base else ent
-            if os.path.isdir(ent_path):
-                allfiles += util.listdir_recursive(ent_path, ent_base)
-            else:
-                allfiles += [ent_base]
-        return allfiles
 
diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
new file mode 100644
index 0000000..cc019a1
--- /dev/null
+++ b/sdk/python/arvados/api.py
@@ -0,0 +1,108 @@
+import httplib2
+import json
+import logging
+import os
+import re
+import types
+
+import apiclient
+import apiclient.discovery
+import util
+
+config = None
+services = {}
+
+class CredentialsFromEnv(object):
+    @staticmethod
+    def http_request(self, uri, **kwargs):
+        global config
+        from httplib import BadStatusLine
+        if 'headers' not in kwargs:
+            kwargs['headers'] = {}
+        kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
+        try:
+            return self.orig_http_request(uri, **kwargs)
+        except BadStatusLine:
+            # This is how httplib tells us that it tried to reuse an
+            # existing connection but it was already closed by the
+            # server. In that case, yes, we would like to retry.
+            # Unfortunately, we are not absolutely certain that the
+            # previous call did not succeed, so this is slightly
+            # risky.
+            return self.orig_http_request(uri, **kwargs)
+    def authorize(self, http):
+        http.orig_http_request = http.request
+        http.request = types.MethodType(self.http_request, http)
+        return http
+
+# Arvados configuration settings are taken from $HOME/.config/arvados.
+# Environment variables override settings in the config file.
+#
+class ArvadosConfig(dict):
+    def __init__(self, config_file):
+        dict.__init__(self)
+        if os.path.exists(config_file):
+            with open(config_file, "r") as f:
+                for config_line in f:
+                    var, val = config_line.rstrip().split('=', 2)
+                    self[var] = val
+        for var in os.environ:
+            if var.startswith('ARVADOS_'):
+                self[var] = os.environ[var]
+
+# Monkey patch discovery._cast() so objects and arrays get serialized
+# with json.dumps() instead of str().
+_cast_orig = apiclient.discovery._cast
+def _cast_objects_too(value, schema_type):
+    global _cast_orig
+    if (type(value) != type('') and
+        (schema_type == 'object' or schema_type == 'array')):
+        return json.dumps(value)
+    else:
+        return _cast_orig(value, schema_type)
+apiclient.discovery._cast = _cast_objects_too
+
+def http_cache(data_type):
+    path = os.environ['HOME'] + '/.cache/arvados/' + data_type
+    try:
+        util.mkdir_dash_p(path)
+    except OSError:
+        path = None
+    return path
+
+def api(version=None):
+    global services, config
+
+    if not config:
+        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
+        if 'ARVADOS_DEBUG' in config:
+            logging.basicConfig(level=logging.DEBUG)
+
+    if not services.get(version):
+        apiVersion = version
+        if not version:
+            apiVersion = 'v1'
+            logging.info("Using default API version. " +
+                         "Call arvados.api('%s') instead." %
+                         apiVersion)
+        if 'ARVADOS_API_HOST' not in config:
+            raise Exception("ARVADOS_API_HOST is not set. Aborting.")
+        url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
+               config['ARVADOS_API_HOST'])
+        credentials = CredentialsFromEnv()
+
+        # Use system's CA certificates (if we find them) instead of httplib2's
+        ca_certs = '/etc/ssl/certs/ca-certificates.crt'
+        if not os.path.exists(ca_certs):
+            ca_certs = None             # use httplib2 default
+
+        http = httplib2.Http(ca_certs=ca_certs,
+                             cache=http_cache('discovery'))
+        http = credentials.authorize(http)
+        if re.match(r'(?i)^(true|1|yes)$',
+                    config.get('ARVADOS_API_HOST_INSECURE', 'no')):
+            http.disable_ssl_certificate_validation=True
+        services[version] = apiclient.discovery.build(
+            'arvados', apiVersion, http=http, discoveryServiceUrl=url)
+    return services[version]
+
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index dc2f0f8..296f950 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -20,6 +20,7 @@ import threading
 
 from stream import *
 from keep import *
+import util
 
 class CollectionReader(object):
     def __init__(self, manifest_locator_or_text):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 62e9d08..6c24704 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -20,7 +20,8 @@ import threading
 
 global_client_object = None
 
-from arvados import *
+from api import *
+import util
 
 class Keep:
     @staticmethod
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
new file mode 100644
index 0000000..9286795
--- /dev/null
+++ b/sdk/python/arvados/util.py
@@ -0,0 +1,302 @@
+import fcntl
+import hashlib
+import os
+import re
+import subprocess
+
+def clear_tmpdir(path=None):
+    """
+    Ensure the given directory (or TASK_TMPDIR if none given)
+    exists and is empty.
+    """
+    if path == None:
+        path = current_task().tmpdir
+    if os.path.exists(path):
+        p = subprocess.Popen(['rm', '-rf', path])
+        stdout, stderr = p.communicate(None)
+        if p.returncode != 0:
+            raise Exception('rm -rf %s: %s' % (path, stderr))
+    os.mkdir(path)
+
+def run_command(execargs, **kwargs):
+    kwargs.setdefault('stdin', subprocess.PIPE)
+    kwargs.setdefault('stdout', subprocess.PIPE)
+    kwargs.setdefault('stderr', sys.stderr)
+    kwargs.setdefault('close_fds', True)
+    kwargs.setdefault('shell', False)
+    p = subprocess.Popen(execargs, **kwargs)
+    stdoutdata, stderrdata = p.communicate(None)
+    if p.returncode != 0:
+        raise errors.CommandFailedError(
+            "run_command %s exit %d:\n%s" %
+            (execargs, p.returncode, stderrdata))
+    return stdoutdata, stderrdata
+
+def git_checkout(url, version, path):
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    if not os.path.exists(path):
+        util.run_command(["git", "clone", url, path],
+                         cwd=os.path.dirname(path))
+    util.run_command(["git", "checkout", version],
+                     cwd=path)
+    return path
+
+def tar_extractor(path, decompress_flag):
+    return subprocess.Popen(["tar",
+                             "-C", path,
+                             ("-x%sf" % decompress_flag),
+                             "-"],
+                            stdout=None,
+                            stdin=subprocess.PIPE, stderr=sys.stderr,
+                            shell=False, close_fds=True)
+
+def tarball_extract(tarball, path):
+    """Retrieve a tarball from Keep and extract it to a local
+    directory.  Return the absolute path where the tarball was
+    extracted. If the top level of the tarball contained just one
+    file or directory, return the absolute path of that single
+    item.
+
+    tarball -- collection locator
+    path -- where to extract the tarball: absolute, or relative to job tmp
+    """
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    already_have_it = False
+    try:
+        if os.readlink(os.path.join(path, '.locator')) == tarball:
+            already_have_it = True
+    except OSError:
+        pass
+    if not already_have_it:
+
+        # emulate "rm -f" (i.e., if the file does not exist, we win)
+        try:
+            os.unlink(os.path.join(path, '.locator'))
+        except OSError:
+            if os.path.exists(os.path.join(path, '.locator')):
+                os.unlink(os.path.join(path, '.locator'))
+
+        for f in CollectionReader(tarball).all_files():
+            if re.search('\.(tbz|tar.bz2)$', f.name()):
+                p = util.tar_extractor(path, 'j')
+            elif re.search('\.(tgz|tar.gz)$', f.name()):
+                p = util.tar_extractor(path, 'z')
+            elif re.search('\.tar$', f.name()):
+                p = util.tar_extractor(path, '')
+            else:
+                raise errors.AssertionError(
+                    "tarball_extract cannot handle filename %s" % f.name())
+            while True:
+                buf = f.read(2**20)
+                if len(buf) == 0:
+                    break
+                p.stdin.write(buf)
+            p.stdin.close()
+            p.wait()
+            if p.returncode != 0:
+                lockfile.close()
+                raise errors.CommandFailedError(
+                    "tar exited %d" % p.returncode)
+        os.symlink(tarball, os.path.join(path, '.locator'))
+    tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
+    lockfile.close()
+    if len(tld_extracts) == 1:
+        return os.path.join(path, tld_extracts[0])
+    return path
+
+def zipball_extract(zipball, path):
+    """Retrieve a zip archive from Keep and extract it to a local
+    directory.  Return the absolute path where the archive was
+    extracted. If the top level of the archive contained just one
+    file or directory, return the absolute path of that single
+    item.
+
+    zipball -- collection locator
+    path -- where to extract the archive: absolute, or relative to job tmp
+    """
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    already_have_it = False
+    try:
+        if os.readlink(os.path.join(path, '.locator')) == zipball:
+            already_have_it = True
+    except OSError:
+        pass
+    if not already_have_it:
+
+        # emulate "rm -f" (i.e., if the file does not exist, we win)
+        try:
+            os.unlink(os.path.join(path, '.locator'))
+        except OSError:
+            if os.path.exists(os.path.join(path, '.locator')):
+                os.unlink(os.path.join(path, '.locator'))
+
+        for f in CollectionReader(zipball).all_files():
+            if not re.search('\.zip$', f.name()):
+                raise errors.NotImplementedError(
+                    "zipball_extract cannot handle filename %s" % f.name())
+            zip_filename = os.path.join(path, os.path.basename(f.name()))
+            zip_file = open(zip_filename, 'wb')
+            while True:
+                buf = f.read(2**20)
+                if len(buf) == 0:
+                    break
+                zip_file.write(buf)
+            zip_file.close()
+            
+            p = subprocess.Popen(["unzip",
+                                  "-q", "-o",
+                                  "-d", path,
+                                  zip_filename],
+                                 stdout=None,
+                                 stdin=None, stderr=sys.stderr,
+                                 shell=False, close_fds=True)
+            p.wait()
+            if p.returncode != 0:
+                lockfile.close()
+                raise errors.CommandFailedError(
+                    "unzip exited %d" % p.returncode)
+            os.unlink(zip_filename)
+        os.symlink(zipball, os.path.join(path, '.locator'))
+    tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
+    lockfile.close()
+    if len(tld_extracts) == 1:
+        return os.path.join(path, tld_extracts[0])
+    return path
+
+def collection_extract(collection, path, files=[], decompress=True):
+    """Retrieve a collection from Keep and extract it to a local
+    directory.  Return the absolute path where the collection was
+    extracted.
+
+    collection -- collection locator
+    path -- where to extract: absolute, or relative to job tmp
+    """
+    matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
+    if matches:
+        collection_hash = matches.group(1)
+    else:
+        collection_hash = hashlib.md5(collection).hexdigest()
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+    already_have_it = False
+    try:
+        if os.readlink(os.path.join(path, '.locator')) == collection_hash:
+            already_have_it = True
+    except OSError:
+        pass
+
+    # emulate "rm -f" (i.e., if the file does not exist, we win)
+    try:
+        os.unlink(os.path.join(path, '.locator'))
+    except OSError:
+        if os.path.exists(os.path.join(path, '.locator')):
+            os.unlink(os.path.join(path, '.locator'))
+
+    files_got = []
+    for s in CollectionReader(collection).all_streams():
+        stream_name = s.name()
+        for f in s.all_files():
+            if (files == [] or
+                ((f.name() not in files_got) and
+                 (f.name() in files or
+                  (decompress and f.decompressed_name() in files)))):
+                outname = f.decompressed_name() if decompress else f.name()
+                files_got += [outname]
+                if os.path.exists(os.path.join(path, stream_name, outname)):
+                    continue
+                util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
+                outfile = open(os.path.join(path, stream_name, outname), 'wb')
+                for buf in (f.readall_decompressed() if decompress
+                            else f.readall()):
+                    outfile.write(buf)
+                outfile.close()
+    if len(files_got) < len(files):
+        raise errors.AssertionError(
+            "Wanted files %s but only got %s from %s" %
+            (files, files_got,
+             [z.name() for z in CollectionReader(collection).all_files()]))
+    os.symlink(collection_hash, os.path.join(path, '.locator'))
+
+    lockfile.close()
+    return path
+
+def mkdir_dash_p(path):
+    if not os.path.exists(path):
+        util.mkdir_dash_p(os.path.dirname(path))
+        try:
+            os.mkdir(path)
+        except OSError:
+            if not os.path.exists(path):
+                os.mkdir(path)
+
+def stream_extract(stream, path, files=[], decompress=True):
+    """Retrieve a stream from Keep and extract it to a local
+    directory.  Return the absolute path where the stream was
+    extracted.
+
+    stream -- StreamReader object
+    path -- where to extract: absolute, or relative to job tmp
+    """
+    if not re.search('^/', path):
+        path = os.path.join(current_job().tmpdir, path)
+    lockfile = open(path + '.lock', 'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    try:
+        os.stat(path)
+    except OSError:
+        os.mkdir(path)
+
+    files_got = []
+    for f in stream.all_files():
+        if (files == [] or
+            ((f.name() not in files_got) and
+             (f.name() in files or
+              (decompress and f.decompressed_name() in files)))):
+            outname = f.decompressed_name() if decompress else f.name()
+            files_got += [outname]
+            if os.path.exists(os.path.join(path, outname)):
+                os.unlink(os.path.join(path, outname))
+            util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
+            outfile = open(os.path.join(path, outname), 'wb')
+            for buf in (f.readall_decompressed() if decompress
+                        else f.readall()):
+                outfile.write(buf)
+            outfile.close()
+    if len(files_got) < len(files):
+        raise errors.AssertionError(
+            "Wanted files %s but only got %s from %s" %
+            (files, files_got, [z.name() for z in stream.all_files()]))
+    lockfile.close()
+    return path
+
+def listdir_recursive(dirname, base=None):
+    allfiles = []
+    for ent in sorted(os.listdir(dirname)):
+        ent_path = os.path.join(dirname, ent)
+        ent_base = os.path.join(base, ent) if base else ent
+        if os.path.isdir(ent_path):
+            allfiles += util.listdir_recursive(ent_path, ent_base)
+        else:
+            allfiles += [ent_base]
+    return allfiles

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list