[arvados] created: 2.7.0-4980-gc8c1fd28b0
git repository hosting
git at public.arvados.org
Wed Oct 11 16:28:14 UTC 2023
at c8c1fd28b0a587a91041e02da13918ca34a5ce6f (commit)
commit c8c1fd28b0a587a91041e02da13918ca34a5ce6f
Author: Brett Smith <brett.smith at curii.com>
Date: Tue Oct 10 16:24:12 2023 -0400
19821: Document the arvados.collection module
* Add docstrings and type signatures to all public methods.
* Mark all deprecated classes and methods. Move them to the bottom of
the module for better organization of web documentation.
* Mark all internal classes and methods.
Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index bfb43be5eb..a96474549a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1,6 +1,16 @@
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+"""Tools to work with Arvados collections
+
+This module provides high-level interfaces to create, read, and update
+Arvados collections. Most users will want to instantiate `Collection`
+objects, and use methods like `Collection.open` and `Collection.mkdirs` to
+read and write data in the collection. Refer to the Arvados Python SDK
+cookbook for [an introduction to using the Collection class][cookbook].
+
+[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
+"""
from __future__ import absolute_import
from future.utils import listitems, listvalues, viewkeys
@@ -35,15 +45,65 @@ import arvados.util
import arvados.events as events
from arvados.retry import retry_method
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ IO,
+ Iterator,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+ Union,
+)
+
+if sys.version_info < (3, 8):
+ from typing_extensions import Literal
+else:
+ from typing import Literal
+
_logger = logging.getLogger('arvados.collection')
+ADD = "add"
+"""Argument value for `Collection` methods to represent an added item"""
+DEL = "del"
+"""Argument value for `Collection` methods to represent a removed item"""
+MOD = "mod"
+"""Argument value for `Collection` methods to represent a modified item"""
+TOK = "tok"
+"""Argument value for `Collection` methods to represent an item with token differences"""
+FILE = "file"
+"""`create_type` value for `Collection.find_or_create`"""
+COLLECTION = "collection"
+"""`create_type` value for `Collection.find_or_create`"""
+
+ChangeList = List[Union[
+ Tuple[Literal[ADD, DEL], str, 'Collection'],
+ Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
+]]
+ChangeType = Literal[ADD, DEL, MOD, TOK]
+CollectionItem = Union[ArvadosFile, 'Collection']
+ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
+CreateType = Literal[COLLECTION, FILE]
+Properties = Mapping[str, Any]
+StorageClasses = List[str]
+
class CollectionBase(object):
- """Abstract base class for Collection classes."""
+ """Abstract base class for Collection classes
+
+ .. ATTENTION:: Internal
+ This class is meant to be used by other parts of the SDK. User code
+ should instantiate or subclass `Collection` or one of its subclasses
+ directly.
+ """
def __enter__(self):
+ """Enter a context block with this collection instance"""
return self
def __exit__(self, exc_type, exc_value, traceback):
+ """Exit a context block with this collection instance"""
pass
def _my_keep(self):
@@ -52,12 +112,13 @@ class CollectionBase(object):
num_retries=self.num_retries)
return self._keep_client
- def stripped_manifest(self):
- """Get the manifest with locator hints stripped.
+ def stripped_manifest(self) -> str:
+ """Create a copy of the collection manifest with only size hints
- Return the manifest for the current collection with all
- non-portable hints (i.e., permission signatures and other
- hints other than size hints) removed from the locators.
+ This method returns a string with the current collection's manifest
+ text with all non-portable locator hints like permission hints and
+ remote cluster hints removed. The only hints in the returned manifest
+ will be size hints.
"""
raw = self.manifest_text()
clean = []
@@ -96,709 +157,379 @@ class _WriterFile(_FileLikeObjectBase):
self.dest.flush_data()
-class CollectionWriter(CollectionBase):
- """Deprecated, use Collection instead."""
+class RichCollectionBase(CollectionBase):
+ """Base class for Collection classes
- @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
- def __init__(self, api_client=None, num_retries=0, replication=None):
- """Instantiate a CollectionWriter.
+ .. ATTENTION:: Internal
+ This class is meant to be used by other parts of the SDK. User code
+ should instantiate or subclass `Collection` or one of its subclasses
+ directly.
+ """
- CollectionWriter lets you build a new Arvados Collection from scratch.
- Write files to it. The CollectionWriter will upload data to Keep as
- appropriate, and provide you with the Collection manifest text when
- you're finished.
+ def __init__(self, parent=None):
+ self.parent = parent
+ self._committed = False
+ self._has_remote_blocks = False
+ self._callback = None
+ self._items = {}
- Arguments:
- * api_client: The API client to use to look up Collections. If not
- provided, CollectionReader will build one from available Arvados
- configuration.
- * num_retries: The default number of times to retry failed
- service requests. Default 0. You may change this value
- after instantiation, but note those changes may not
- propagate to related objects like the Keep client.
- * replication: The number of copies of each block to store.
- If this argument is None or not supplied, replication is
- the server-provided default if available, otherwise 2.
- """
- self._api_client = api_client
- self.num_retries = num_retries
- self.replication = (2 if replication is None else replication)
- self._keep_client = None
- self._data_buffer = []
- self._data_buffer_len = 0
- self._current_stream_files = []
- self._current_stream_length = 0
- self._current_stream_locators = []
- self._current_stream_name = '.'
- self._current_file_name = None
- self._current_file_pos = 0
- self._finished_streams = []
- self._close_file = None
- self._queued_file = None
- self._queued_dirents = deque()
- self._queued_trees = deque()
- self._last_open = None
+ def _my_api(self):
+ raise NotImplementedError()
- def __exit__(self, exc_type, exc_value, traceback):
- if exc_type is None:
- self.finish()
+ def _my_keep(self):
+ raise NotImplementedError()
- def do_queued_work(self):
- # The work queue consists of three pieces:
- # * _queued_file: The file object we're currently writing to the
- # Collection.
- # * _queued_dirents: Entries under the current directory
- # (_queued_trees[0]) that we want to write or recurse through.
- # This may contain files from subdirectories if
- # max_manifest_depth == 0 for this directory.
- # * _queued_trees: Directories that should be written as separate
- # streams to the Collection.
- # This function handles the smallest piece of work currently queued
- # (current file, then current directory, then next directory) until
- # no work remains. The _work_THING methods each do a unit of work on
- # THING. _queue_THING methods add a THING to the work queue.
- while True:
- if self._queued_file:
- self._work_file()
- elif self._queued_dirents:
- self._work_dirents()
- elif self._queued_trees:
- self._work_trees()
- else:
- break
+ def _my_block_manager(self):
+ raise NotImplementedError()
- def _work_file(self):
- while True:
- buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
- if not buf:
- break
- self.write(buf)
- self.finish_current_file()
- if self._close_file:
- self._queued_file.close()
- self._close_file = None
- self._queued_file = None
+ def writable(self) -> bool:
+ """Indicate whether this collection object can be modified
- def _work_dirents(self):
- path, stream_name, max_manifest_depth = self._queued_trees[0]
- if stream_name != self.current_stream_name():
- self.start_new_stream(stream_name)
- while self._queued_dirents:
- dirent = self._queued_dirents.popleft()
- target = os.path.join(path, dirent)
- if os.path.isdir(target):
- self._queue_tree(target,
- os.path.join(stream_name, dirent),
- max_manifest_depth - 1)
- else:
- self._queue_file(target, dirent)
- break
- if not self._queued_dirents:
- self._queued_trees.popleft()
+ This method returns `False` if this object is a `CollectionReader`,
+ else `True`.
+ """
+ raise NotImplementedError()
- def _work_trees(self):
- path, stream_name, max_manifest_depth = self._queued_trees[0]
- d = arvados.util.listdir_recursive(
- path, max_depth = (None if max_manifest_depth == 0 else 0))
- if d:
- self._queue_dirents(stream_name, d)
- else:
- self._queued_trees.popleft()
+ def root_collection(self) -> 'Collection':
+ """Get this collection's root collection object
- def _queue_file(self, source, filename=None):
- assert (self._queued_file is None), "tried to queue more than one file"
- if not hasattr(source, 'read'):
- source = open(source, 'rb')
- self._close_file = True
- else:
- self._close_file = False
- if filename is None:
- filename = os.path.basename(source.name)
- self.start_new_file(filename)
- self._queued_file = source
+ If you open a subcollection with `Collection.find`, calling this method
+ on that subcollection returns the source Collection object.
+ """
+ raise NotImplementedError()
- def _queue_dirents(self, stream_name, dirents):
- assert (not self._queued_dirents), "tried to queue more than one tree"
- self._queued_dirents = deque(sorted(dirents))
+ def stream_name(self) -> str:
+ """Get the name of the manifest stream represented by this collection
- def _queue_tree(self, path, stream_name, max_manifest_depth):
- self._queued_trees.append((path, stream_name, max_manifest_depth))
+ If you open a subcollection with `Collection.find`, calling this method
+ on that subcollection returns the name of the stream you opened.
+ """
+ raise NotImplementedError()
- def write_file(self, source, filename=None):
- self._queue_file(source, filename)
- self.do_queued_work()
+ @synchronized
+ def has_remote_blocks(self) -> bool:
+ """Indiciate whether the collection refers to remote data
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- self._queue_tree(path, stream_name, max_manifest_depth)
- self.do_queued_work()
+ Returns `True` if the collection manifest includes any Keep locators
+ with a remote hint (`+R`), else `False`.
+ """
+ if self._has_remote_blocks:
+ return True
+ for item in self:
+ if self[item].has_remote_blocks():
+ return True
+ return False
- def write(self, newdata):
- if isinstance(newdata, bytes):
- pass
- elif isinstance(newdata, str):
- newdata = newdata.encode()
- elif hasattr(newdata, '__iter__'):
- for s in newdata:
- self.write(s)
- return
- self._data_buffer.append(newdata)
- self._data_buffer_len += len(newdata)
- self._current_stream_length += len(newdata)
- while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
- self.flush_data()
+ @synchronized
+ def set_has_remote_blocks(self, val: bool) -> None:
+ """Cache whether this collection refers to remote blocks
- def open(self, streampath, filename=None):
- """open(streampath[, filename]) -> file-like object
+ .. ATTENTION:: Internal
+ This method is only meant to be used by other Collection methods.
- Pass in the path of a file to write to the Collection, either as a
- single string or as two separate stream name and file name arguments.
- This method returns a file-like object you can write to add it to the
- Collection.
+ Set this collection's cached "has remote blocks" flag to the given
+ value.
+ """
+ self._has_remote_blocks = val
+ if self.parent:
+ self.parent.set_has_remote_blocks(val)
- You may only have one file object from the Collection open at a time,
- so be sure to close the object when you're done. Using the object in
- a with statement makes that easy::
+ @must_be_writable
+ @synchronized
+ def find_or_create(
+ self,
+ path: str,
+ create_type: CreateType,
+ ) -> CollectionItem:
+ """Get the item at the given path, creating it if necessary
+
+ If `path` refers to a stream in this collection, returns a
+ corresponding `Subcollection` object. If `path` refers to a file in
+ this collection, returns a corresponding
+ `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
+ this collection, then this method creates a new object and returns
+ it, creating parent streams as needed. The type of object created is
+ determined by the value of `create_type`.
- with cwriter.open('./doc/page1.txt') as outfile:
- outfile.write(page1_data)
- with cwriter.open('./doc/page2.txt') as outfile:
- outfile.write(page2_data)
+ Arguments:
+
+ * path: str --- The path to find or create within this collection.
+
+ * create_type: Literal[COLLECTION, FILE] --- The type of object to
+ create at `path` if one does not exist. Passing `COLLECTION`
+ creates a stream and returns the corresponding
+ `Subcollection`. Passing `FILE` creates a new file and returns the
+ corresponding `arvados.arvfile.ArvadosFile`.
"""
- if filename is None:
- streampath, filename = split(streampath)
- if self._last_open and not self._last_open.closed:
- raise errors.AssertionError(
- u"can't open '{}' when '{}' is still open".format(
- filename, self._last_open.name))
- if streampath != self.current_stream_name():
- self.start_new_stream(streampath)
- self.set_current_file_name(filename)
- self._last_open = _WriterFile(self, filename)
- return self._last_open
+ pathcomponents = path.split("/", 1)
+ if pathcomponents[0]:
+ item = self._items.get(pathcomponents[0])
+ if len(pathcomponents) == 1:
+ if item is None:
+ # create new file
+ if create_type == COLLECTION:
+ item = Subcollection(self, pathcomponents[0])
+ else:
+ item = ArvadosFile(self, pathcomponents[0])
+ self._items[pathcomponents[0]] = item
+ self.set_committed(False)
+ self.notify(ADD, self, pathcomponents[0], item)
+ return item
+ else:
+ if item is None:
+ # create new collection
+ item = Subcollection(self, pathcomponents[0])
+ self._items[pathcomponents[0]] = item
+ self.set_committed(False)
+ self.notify(ADD, self, pathcomponents[0], item)
+ if isinstance(item, RichCollectionBase):
+ return item.find_or_create(pathcomponents[1], create_type)
+ else:
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+ else:
+ return self
- def flush_data(self):
- data_buffer = b''.join(self._data_buffer)
- if data_buffer:
- self._current_stream_locators.append(
- self._my_keep().put(
- data_buffer[0:config.KEEP_BLOCK_SIZE],
- copies=self.replication))
- self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
- self._data_buffer_len = len(self._data_buffer[0])
+ @synchronized
+ def find(self, path: str) -> CollectionItem:
+ """Get the item at the given path
- def start_new_file(self, newfilename=None):
- self.finish_current_file()
- self.set_current_file_name(newfilename)
+ If `path` refers to a stream in this collection, returns a
+ corresponding `Subcollection` object. If `path` refers to a file in
+ this collection, returns a corresponding
+ `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
+ this collection, then this method raises `NotADirectoryError`.
- def set_current_file_name(self, newfilename):
- if re.search(r'[\t\n]', newfilename):
- raise errors.AssertionError(
- "Manifest filenames cannot contain whitespace: %s" %
- newfilename)
- elif re.search(r'\x00', newfilename):
- raise errors.AssertionError(
- "Manifest filenames cannot contain NUL characters: %s" %
- newfilename)
- self._current_file_name = newfilename
+ Arguments:
- def current_file_name(self):
- return self._current_file_name
+ * path: str --- The path to find or create within this collection.
+ """
+ if not path:
+ raise errors.ArgumentError("Parameter 'path' is empty.")
- def finish_current_file(self):
- if self._current_file_name is None:
- if self._current_file_pos == self._current_stream_length:
- return
- raise errors.AssertionError(
- "Cannot finish an unnamed file " +
- "(%d bytes at offset %d in '%s' stream)" %
- (self._current_stream_length - self._current_file_pos,
- self._current_file_pos,
- self._current_stream_name))
- self._current_stream_files.append([
- self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name])
- self._current_file_pos = self._current_stream_length
- self._current_file_name = None
+ pathcomponents = path.split("/", 1)
+ if pathcomponents[0] == '':
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
- def start_new_stream(self, newstreamname='.'):
- self.finish_current_stream()
- self.set_current_stream_name(newstreamname)
+ item = self._items.get(pathcomponents[0])
+ if item is None:
+ return None
+ elif len(pathcomponents) == 1:
+ return item
+ else:
+ if isinstance(item, RichCollectionBase):
+ if pathcomponents[1]:
+ return item.find(pathcomponents[1])
+ else:
+ return item
+ else:
+ raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
- def set_current_stream_name(self, newstreamname):
- if re.search(r'[\t\n]', newstreamname):
- raise errors.AssertionError(
- "Manifest stream names cannot contain whitespace: '%s'" %
- (newstreamname))
- self._current_stream_name = '.' if newstreamname=='' else newstreamname
+ @synchronized
+ def mkdirs(self, path: str) -> 'Subcollection':
+ """Create and return a subcollection at `path`
- def current_stream_name(self):
- return self._current_stream_name
+ If `path` exists within this collection, raises `FileExistsError`.
+ Otherwise, creates a stream at that path and returns the
+ corresponding `Subcollection`.
+ """
+ if self.find(path) != None:
+ raise IOError(errno.EEXIST, "Directory or file exists", path)
- def finish_current_stream(self):
- self.finish_current_file()
- self.flush_data()
- if not self._current_stream_files:
- pass
- elif self._current_stream_name is None:
- raise errors.AssertionError(
- "Cannot finish an unnamed stream (%d bytes in %d files)" %
- (self._current_stream_length, len(self._current_stream_files)))
- else:
- if not self._current_stream_locators:
- self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
- self._finished_streams.append([self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files])
- self._current_stream_files = []
- self._current_stream_length = 0
- self._current_stream_locators = []
- self._current_stream_name = None
- self._current_file_pos = 0
- self._current_file_name = None
+ return self.find_or_create(path, COLLECTION)
- def finish(self):
- """Store the manifest in Keep and return its locator.
+ def open(
+ self,
+ path: str,
+ mode: str="r",
+ encoding: Optional[str]=None,
+ ) -> IO:
+ """Open a file-like object within the collection
- This is useful for storing manifest fragments (task outputs)
- temporarily in Keep during a Crunch job.
+ This method returns a file-like object that can read and/or write the
+ file located at `path` within the collection. If you attempt to write
+ a `path` that does not exist, the file is created with `find_or_create`.
+ If the file cannot be opened for any other reason, this method raises
+ `OSError` with an appropriate errno.
- In other cases you should make a collection instead, by
- sending manifest_text() to the API server's "create
- collection" endpoint.
- """
- return self._my_keep().put(self.manifest_text().encode(),
- copies=self.replication)
+ Arguments:
- def portable_data_hash(self):
- stripped = self.stripped_manifest().encode()
- return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
+ * path: str --- The path of the file to open within this collection
- def manifest_text(self):
- self.finish_current_stream()
- manifest = ''
+ * mode: str --- The mode to open this file. Supports all the same
+ values as `builtins.open`.
- for stream in self._finished_streams:
- if not re.search(r'^\.(/.*)?$', stream[0]):
- manifest += './'
- manifest += stream[0].replace(' ', '\\040')
- manifest += ' ' + ' '.join(stream[1])
- manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
- manifest += "\n"
+ * encoding: str | None --- The text encoding of the file. Only used
+ when the file is opened in text mode. The default is
+ platform-dependent.
+ """
+ if not re.search(r'^[rwa][bt]?\+?$', mode):
+ raise errors.ArgumentError("Invalid mode {!r}".format(mode))
- return manifest
+ if mode[0] == 'r' and '+' not in mode:
+ fclass = ArvadosFileReader
+ arvfile = self.find(path)
+ elif not self.writable():
+ raise IOError(errno.EROFS, "Collection is read only")
+ else:
+ fclass = ArvadosFileWriter
+ arvfile = self.find_or_create(path, FILE)
- def data_locators(self):
- ret = []
- for name, locators, files in self._finished_streams:
- ret += locators
- return ret
+ if arvfile is None:
+ raise IOError(errno.ENOENT, "File not found", path)
+ if not isinstance(arvfile, ArvadosFile):
+ raise IOError(errno.EISDIR, "Is a directory", path)
- def save_new(self, name=None):
- return self._api_client.collections().create(
- ensure_unique_name=True,
- body={
- 'name': name,
- 'manifest_text': self.manifest_text(),
- }).execute(num_retries=self.num_retries)
+ if mode[0] == 'w':
+ arvfile.truncate(0)
+ binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+ f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+ if 'b' not in mode:
+ bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+ f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ return f
-class ResumableCollectionWriter(CollectionWriter):
- """Deprecated, use Collection instead."""
+ def modified(self) -> bool:
+ """Indicate whether this collection has an API server record
- STATE_PROPS = ['_current_stream_files', '_current_stream_length',
- '_current_stream_locators', '_current_stream_name',
- '_current_file_name', '_current_file_pos', '_close_file',
- '_data_buffer', '_dependencies', '_finished_streams',
- '_queued_dirents', '_queued_trees']
+ Returns `False` if this collection corresponds to a record loaded from
+ the API server, `True` otherwise.
+ """
+ return not self.committed()
- @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
- def __init__(self, api_client=None, **kwargs):
- self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
+ @synchronized
+ def committed(self):
+ """Indicate whether this collection has an API server record
- @classmethod
- def from_state(cls, state, *init_args, **init_kwargs):
- # Try to build a new writer from scratch with the given state.
- # If the state is not suitable to resume (because files have changed,
- # been deleted, aren't predictable, etc.), raise a
- # StaleWriterStateError. Otherwise, return the initialized writer.
- # The caller is responsible for calling writer.do_queued_work()
- # appropriately after it's returned.
- writer = cls(*init_args, **init_kwargs)
- for attr_name in cls.STATE_PROPS:
- attr_value = state[attr_name]
- attr_class = getattr(writer, attr_name).__class__
- # Coerce the value into the same type as the initial value, if
- # needed.
- if attr_class not in (type(None), attr_value.__class__):
- attr_value = attr_class(attr_value)
- setattr(writer, attr_name, attr_value)
- # Check dependencies before we try to resume anything.
- if any(KeepLocator(ls).permission_expired()
- for ls in writer._current_stream_locators):
- raise errors.StaleWriterStateError(
- "locators include expired permission hint")
- writer.check_dependencies()
- if state['_current_file'] is not None:
- path, pos = state['_current_file']
- try:
- writer._queued_file = open(path, 'rb')
- writer._queued_file.seek(pos)
- except IOError as error:
- raise errors.StaleWriterStateError(
- u"failed to reopen active file {}: {}".format(path, error))
- return writer
+ Returns `True` if this collection corresponds to a record loaded from
+ the API server, `False` otherwise.
+ """
+ return self._committed
- def check_dependencies(self):
- for path, orig_stat in listitems(self._dependencies):
- if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError(u"{} not file".format(path))
- try:
- now_stat = tuple(os.stat(path))
- except OSError as error:
- raise errors.StaleWriterStateError(
- u"failed to stat {}: {}".format(path, error))
- if ((not S_ISREG(now_stat[ST_MODE])) or
- (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
- (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError(u"{} changed".format(path))
+ @synchronized
+ def set_committed(self, value: bool=True):
+ """Cache whether this collection has an API server record
- def dump_state(self, copy_func=lambda x: x):
- state = {attr: copy_func(getattr(self, attr))
- for attr in self.STATE_PROPS}
- if self._queued_file is None:
- state['_current_file'] = None
- else:
- state['_current_file'] = (os.path.realpath(self._queued_file.name),
- self._queued_file.tell())
- return state
+ .. ATTENTION:: Internal
+ This method is only meant to be used by other Collection methods.
- def _queue_file(self, source, filename=None):
- try:
- src_path = os.path.realpath(source)
- except Exception:
- raise errors.AssertionError(u"{} not a file path".format(source))
- try:
- path_stat = os.stat(src_path)
- except OSError as stat_error:
- path_stat = None
- super(ResumableCollectionWriter, self)._queue_file(source, filename)
- fd_stat = os.fstat(self._queued_file.fileno())
- if not S_ISREG(fd_stat.st_mode):
- # We won't be able to resume from this cache anyway, so don't
- # worry about further checks.
- self._dependencies[source] = tuple(fd_stat)
- elif path_stat is None:
- raise errors.AssertionError(
- u"could not stat {}: {}".format(source, stat_error))
- elif path_stat.st_ino != fd_stat.st_ino:
- raise errors.AssertionError(
- u"{} changed between open and stat calls".format(source))
+ Set this collection's cached "committed" flag to the given
+ value and propagates it as needed.
+ """
+ if value == self._committed:
+ return
+ if value:
+ for k,v in listitems(self._items):
+ v.set_committed(True)
+ self._committed = True
else:
- self._dependencies[src_path] = tuple(fd_stat)
+ self._committed = False
+ if self.parent is not None:
+ self.parent.set_committed(False)
- def write(self, data):
- if self._queued_file is None:
- raise errors.AssertionError(
- "resumable writer can't accept unsourced data")
- return super(ResumableCollectionWriter, self).write(data)
+ @synchronized
+ def __iter__(self) -> Iterator[str]:
+ """Iterate names of streams and files in this collection
-
-ADD = "add"
-DEL = "del"
-MOD = "mod"
-TOK = "tok"
-FILE = "file"
-COLLECTION = "collection"
-
-class RichCollectionBase(CollectionBase):
- """Base class for Collections and Subcollections.
-
- Implements the majority of functionality relating to accessing items in the
- Collection.
-
- """
-
- def __init__(self, parent=None):
- self.parent = parent
- self._committed = False
- self._has_remote_blocks = False
- self._callback = None
- self._items = {}
-
- def _my_api(self):
- raise NotImplementedError()
-
- def _my_keep(self):
- raise NotImplementedError()
-
- def _my_block_manager(self):
- raise NotImplementedError()
-
- def writable(self):
- raise NotImplementedError()
-
- def root_collection(self):
- raise NotImplementedError()
-
- def notify(self, event, collection, name, item):
- raise NotImplementedError()
-
- def stream_name(self):
- raise NotImplementedError()
-
-
- @synchronized
- def has_remote_blocks(self):
- """Recursively check for a +R segment locator signature."""
-
- if self._has_remote_blocks:
- return True
- for item in self:
- if self[item].has_remote_blocks():
- return True
- return False
+ This method does not recurse. It only iterates the contents of this
+ collection's corresponding stream.
+ """
+ return iter(viewkeys(self._items))
@synchronized
- def set_has_remote_blocks(self, val):
- self._has_remote_blocks = val
- if self.parent:
- self.parent.set_has_remote_blocks(val)
-
- @must_be_writable
- @synchronized
- def find_or_create(self, path, create_type):
- """Recursively search the specified file path.
-
- May return either a `Collection` or `ArvadosFile`. If not found, will
- create a new item at the specified path based on `create_type`. Will
- create intermediate subcollections needed to contain the final item in
- the path.
-
- :create_type:
- One of `arvados.collection.FILE` or
- `arvados.collection.COLLECTION`. If the path is not found, and value
- of create_type is FILE then create and return a new ArvadosFile for
- the last path component. If COLLECTION, then create and return a new
- Collection for the last path component.
+ def __getitem__(self, k: str) -> CollectionItem:
+ """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
+ This method does not recurse. If you want to search a path, use
+ `RichCollectionBase.find` instead.
"""
-
- pathcomponents = path.split("/", 1)
- if pathcomponents[0]:
- item = self._items.get(pathcomponents[0])
- if len(pathcomponents) == 1:
- if item is None:
- # create new file
- if create_type == COLLECTION:
- item = Subcollection(self, pathcomponents[0])
- else:
- item = ArvadosFile(self, pathcomponents[0])
- self._items[pathcomponents[0]] = item
- self.set_committed(False)
- self.notify(ADD, self, pathcomponents[0], item)
- return item
- else:
- if item is None:
- # create new collection
- item = Subcollection(self, pathcomponents[0])
- self._items[pathcomponents[0]] = item
- self.set_committed(False)
- self.notify(ADD, self, pathcomponents[0], item)
- if isinstance(item, RichCollectionBase):
- return item.find_or_create(pathcomponents[1], create_type)
- else:
- raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
- else:
- return self
+ return self._items[k]
@synchronized
- def find(self, path):
- """Recursively search the specified file path.
-
- May return either a Collection or ArvadosFile. Return None if not
- found.
- If path is invalid (ex: starts with '/'), an IOError exception will be
- raised.
+ def __contains__(self, k: str) -> bool:
+ """Indicate whether this collection has an item with this name
+ This method does not recurse. It you want to check a path, use
+ `RichCollectionBase.exists` instead.
"""
- if not path:
- raise errors.ArgumentError("Parameter 'path' is empty.")
-
- pathcomponents = path.split("/", 1)
- if pathcomponents[0] == '':
- raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
-
- item = self._items.get(pathcomponents[0])
- if item is None:
- return None
- elif len(pathcomponents) == 1:
- return item
- else:
- if isinstance(item, RichCollectionBase):
- if pathcomponents[1]:
- return item.find(pathcomponents[1])
- else:
- return item
- else:
- raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
+ return k in self._items
@synchronized
- def mkdirs(self, path):
- """Recursive subcollection create.
-
- Like `os.makedirs()`. Will create intermediate subcollections needed
- to contain the leaf subcollection path.
+ def __len__(self):
+ """Get the number of items directly contained in this collection
+ This method does not recurse. It only counts the streams and files
+ in this collection's corresponding stream.
"""
+ return len(self._items)
- if self.find(path) != None:
- raise IOError(errno.EEXIST, "Directory or file exists", path)
-
- return self.find_or_create(path, COLLECTION)
-
- def open(self, path, mode="r", encoding=None):
- """Open a file-like object for access.
-
- :path:
- path to a file in the collection
- :mode:
- a string consisting of "r", "w", or "a", optionally followed
- by "b" or "t", optionally followed by "+".
- :"b":
- binary mode: write() accepts bytes, read() returns bytes.
- :"t":
- text mode (default): write() accepts strings, read() returns strings.
- :"r":
- opens for reading
- :"r+":
- opens for reading and writing. Reads/writes share a file pointer.
- :"w", "w+":
- truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
- :"a", "a+":
- opens for reading and writing. All writes are appended to
- the end of the file. Writing does not affect the file pointer for
- reading.
+ @must_be_writable
+ @synchronized
+ def __delitem__(self, p: str) -> None:
+ """Delete an item from this collection's stream
+ This method does not recurse. If you want to remove an item by a
+ path, use `RichCollectionBase.remove` instead.
"""
-
- if not re.search(r'^[rwa][bt]?\+?$', mode):
- raise errors.ArgumentError("Invalid mode {!r}".format(mode))
-
- if mode[0] == 'r' and '+' not in mode:
- fclass = ArvadosFileReader
- arvfile = self.find(path)
- elif not self.writable():
- raise IOError(errno.EROFS, "Collection is read only")
- else:
- fclass = ArvadosFileWriter
- arvfile = self.find_or_create(path, FILE)
-
- if arvfile is None:
- raise IOError(errno.ENOENT, "File not found", path)
- if not isinstance(arvfile, ArvadosFile):
- raise IOError(errno.EISDIR, "Is a directory", path)
-
- if mode[0] == 'w':
- arvfile.truncate(0)
-
- binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
- f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
- if 'b' not in mode:
- bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
- f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
- return f
-
- def modified(self):
- """Determine if the collection has been modified since last commited."""
- return not self.committed()
-
- @synchronized
- def committed(self):
- """Determine if the collection has been committed to the API server."""
- return self._committed
+ del self._items[p]
+ self.set_committed(False)
+ self.notify(DEL, self, p, None)
@synchronized
- def set_committed(self, value=True):
- """Recursively set committed flag.
-
- If value is True, set committed to be True for this and all children.
+ def keys(self) -> Iterator[str]:
+ """Iterate names of streams and files in this collection
- If value is False, set committed to be False for this and all parents.
+ This method does not recurse. It only iterates the contents of this
+ collection's corresponding stream.
"""
- if value == self._committed:
- return
- if value:
- for k,v in listitems(self._items):
- v.set_committed(True)
- self._committed = True
- else:
- self._committed = False
- if self.parent is not None:
- self.parent.set_committed(False)
-
- @synchronized
- def __iter__(self):
- """Iterate over names of files and collections contained in this collection."""
- return iter(viewkeys(self._items))
+ return self._items.keys()
@synchronized
- def __getitem__(self, k):
- """Get a file or collection that is directly contained by this collection.
-
- If you want to search a path, use `find()` instead.
+ def values(self) -> List[CollectionItem]:
+ """Get a list of objects in this collection's stream
+ The return value includes a `Subcollection` for every stream, and an
+ `arvados.arvfile.ArvadosFile` for every file, directly within this
+ collection's stream. This method does not recurse.
"""
- return self._items[k]
-
- @synchronized
- def __contains__(self, k):
- """Test if there is a file or collection a directly contained by this collection."""
- return k in self._items
+ return listvalues(self._items)
@synchronized
- def __len__(self):
- """Get the number of items directly contained in this collection."""
- return len(self._items)
+ def items(self) -> List[Tuple[str, CollectionItem]]:
+ """Get a list of `(name, object)` tuples from this collection's stream
- @must_be_writable
- @synchronized
- def __delitem__(self, p):
- """Delete an item by name which is directly contained by this collection."""
- del self._items[p]
- self.set_committed(False)
- self.notify(DEL, self, p, None)
+ The return value includes a `Subcollection` for every stream, and an
+ `arvados.arvfile.ArvadosFile` for every file, directly within this
+ collection's stream. This method does not recurse.
+ """
+ return listitems(self._items)
- @synchronized
- def keys(self):
- """Get a list of names of files and collections directly contained in this collection."""
- return self._items.keys()
+ def exists(self, path: str) -> bool:
+ """Indicate whether this collection includes an item at `path`
- @synchronized
- def values(self):
- """Get a list of files and collection objects directly contained in this collection."""
- return listvalues(self._items)
+ This method returns `True` if `path` refers to a stream or file within
+ this collection, else `False`.
- @synchronized
- def items(self):
- """Get a list of (name, object) tuples directly contained in this collection."""
- return listitems(self._items)
+ Arguments:
- def exists(self, path):
- """Test if there is a file or collection at `path`."""
+ * path: str --- The path to check for existence within this collection
+ """
return self.find(path) is not None
@must_be_writable
@synchronized
- def remove(self, path, recursive=False):
- """Remove the file or subcollection (directory) at `path`.
+ def remove(self, path: str, recursive: bool=False) -> None:
+ """Remove the file or stream at `path`
- :recursive:
- Specify whether to remove non-empty subcollections (True), or raise an error (False).
- """
+ Arguments:
+
+ * path: str --- The path of the item to remove from the collection
+ * recursive: bool --- Controls the method's behavior if `path` refers
+ to a nonempty stream. If `False` (the default), this method raises
+ `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
+ items under the stream.
+ """
if not path:
raise errors.ArgumentError("Parameter 'path' is empty.")
@@ -825,26 +556,33 @@ class RichCollectionBase(CollectionBase):
@must_be_writable
@synchronized
- def add(self, source_obj, target_name, overwrite=False, reparent=False):
- """Copy or move a file or subcollection to this collection.
+ def add(
+ self,
+ source_obj: CollectionItem,
+ target_name: str,
+ overwrite: bool=False,
+ reparent: bool=False,
+ ) -> None:
+ """Copy or move a file or subcollection object to this collection
- :source_obj:
- An ArvadosFile, or Subcollection object
+ Arguments:
- :target_name:
- Destination item name. If the target name already exists and is a
- file, this will raise an error unless you specify `overwrite=True`.
+ * source_obj: ArvadosFile | Subcollection --- The file or subcollection
+ to add to this collection
- :overwrite:
- Whether to overwrite target file if it already exists.
+ * target_name: str --- The path inside this collection where
+ `source_obj` should be added.
- :reparent:
- If True, source_obj will be moved from its parent collection to this collection.
- If False, source_obj will be copied and the parent collection will be
- unmodified.
+ * overwrite: bool --- Controls the behavior of this method when the
+ collection already contains an object at `target_name`. If `False`
+ (the default), this method will raise `FileExistsError`. If `True`,
+ the object at `target_name` will be replaced with `source_obj`.
+ * reparent: bool --- Controls whether this method copies or moves
+ `source_obj`. If `False` (the default), `source_obj` is copied into
+ this collection. If `True`, `source_obj` is moved into this
+ collection.
"""
-
if target_name in self and not overwrite:
raise IOError(errno.EEXIST, "File already exists", target_name)
@@ -911,92 +649,115 @@ class RichCollectionBase(CollectionBase):
@must_be_writable
@synchronized
- def copy(self, source, target_path, source_collection=None, overwrite=False):
- """Copy a file or subcollection to a new path in this collection.
+ def copy(
+ self,
+ source: Union[str, CollectionItem],
+ target_path: str,
+ source_collection: Optional['RichCollectionBase']=None,
+ overwrite: bool=False,
+ ) -> None:
+ """Copy a file or subcollection object to this collection
+
+ Arguments:
- :source:
- A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
+ * source: str | ArvadosFile | Subcollection --- The file or
+ subcollection to add to this collection. If `source` is a str, the
+ object will be found by looking up this path from `source_collection`
+ (see below).
- :target_path:
- Destination file or path. If the target path already exists and is a
- subcollection, the item will be placed inside the subcollection. If
- the target path already exists and is a file, this will raise an error
- unless you specify `overwrite=True`.
+ * target_path: str --- The path inside this collection where the
+ source object should be added.
- :source_collection:
- Collection to copy `source_path` from (default `self`)
+ * source_collection: Collection | None --- The collection to find the
+ source object from when `source` is a path. Defaults to the current
+ collection (`self`).
- :overwrite:
- Whether to overwrite target file if it already exists.
+ * overwrite: bool --- Controls the behavior of this method when the
+ collection already contains an object at `target_path`. If `False`
+ (the default), this method will raise `FileExistsError`. If `True`,
+ the object at `target_path` will be replaced with `source_obj`.
"""
-
source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
target_dir.add(source_obj, target_name, overwrite, False)
@must_be_writable
@synchronized
- def rename(self, source, target_path, source_collection=None, overwrite=False):
- """Move a file or subcollection from `source_collection` to a new path in this collection.
+ def rename(
+ self,
+ source: Union[str, CollectionItem],
+ target_path: str,
+ source_collection: Optional['RichCollectionBase']=None,
+ overwrite: bool=False,
+ ) -> None:
+ """Move a file or subcollection object to this collection
- :source:
- A string with a path to source file or subcollection.
+ Arguments:
- :target_path:
- Destination file or path. If the target path already exists and is a
- subcollection, the item will be placed inside the subcollection. If
- the target path already exists and is a file, this will raise an error
- unless you specify `overwrite=True`.
+ * source: str | ArvadosFile | Subcollection --- The file or
+ subcollection to add to this collection. If `source` is a str, the
+ object will be found by looking up this path from `source_collection`
+ (see below).
- :source_collection:
- Collection to copy `source_path` from (default `self`)
+ * target_path: str --- The path inside this collection where the
+ source object should be added.
- :overwrite:
- Whether to overwrite target file if it already exists.
- """
+ * source_collection: Collection | None --- The collection to find the
+ source object from when `source` is a path. Defaults to the current
+ collection (`self`).
+ * overwrite: bool --- Controls the behavior of this method when the
+ collection already contains an object at `target_path`. If `False`
+ (the default), this method will raise `FileExistsError`. If `True`,
+ the object at `target_path` will be replaced with `source_obj`.
+ """
source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
if not source_obj.writable():
raise IOError(errno.EROFS, "Source collection is read only", source)
target_dir.add(source_obj, target_name, overwrite, True)
- def portable_manifest_text(self, stream_name="."):
- """Get the manifest text for this collection, sub collections and files.
+ def portable_manifest_text(self, stream_name: str=".") -> str:
+ """Get the portable manifest text for this collection
- This method does not flush outstanding blocks to Keep. It will return
- a normalized manifest with access tokens stripped.
+ The portable manifest text is normalized, and does not include access
+ tokens. This method does not flush outstanding blocks to Keep.
- :stream_name:
- Name to use for this stream (directory)
+ Arguments:
+ * stream_name: str --- The name to use for this collection's stream in
+ the generated manifest. Default `'.'`.
"""
return self._get_manifest_text(stream_name, True, True)
@synchronized
- def manifest_text(self, stream_name=".", strip=False, normalize=False,
- only_committed=False):
- """Get the manifest text for this collection, sub collections and files.
+ def manifest_text(
+ self,
+ stream_name: str=".",
+ strip: bool=False,
+ normalize: bool=False,
+ only_committed: bool=False,
+ ) -> str:
+ """Get the manifest text for this collection
- This method will flush outstanding blocks to Keep. By default, it will
- not normalize an unmodified manifest or strip access tokens.
-
- :stream_name:
- Name to use for this stream (directory)
+ Arguments:
- :strip:
- If True, remove signing tokens from block locators if present.
- If False (default), block locators are left unchanged.
+ * stream_name: str --- The name to use for this collection's stream in
+ the generated manifest. Default `'.'`.
- :normalize:
- If True, always export the manifest text in normalized form
- even if the Collection is not modified. If False (default) and the collection
- is not modified, return the original manifest text even if it is not
- in normalized form.
+ * strip: bool --- Controls whether or not the returned manifest text
+ includes access tokens. If `False` (the default), the manifest text
+ will include access tokens. If `True`, the manifest text will not
+ include access tokens.
- :only_committed:
- If True, don't commit pending blocks.
+ * normalize: bool --- Controls whether or not the returned manifest
+ text is normalized. Default `False`.
+ * only_committed: bool --- Controls whether or not this method uploads
+ pending data to Keep before building and returning the manifest text.
+ If `False` (the default), this method will finish uploading all data
+ to Keep, then return the final manifest. If `True`, this method will
+ build and return a manifest that only refers to the data that has
+ finished uploading at the time this method was called.
"""
-
if not only_committed:
self._my_block_manager().commit_all()
return self._get_manifest_text(stream_name, strip, normalize,
@@ -1075,11 +836,26 @@ class RichCollectionBase(CollectionBase):
return remote_blocks
@synchronized
- def diff(self, end_collection, prefix=".", holding_collection=None):
- """Generate list of add/modify/delete actions.
+ def diff(
+ self,
+ end_collection: 'RichCollectionBase',
+ prefix: str=".",
+ holding_collection: Optional['Collection']=None,
+ ) -> ChangeList:
+ """Build a list of differences between this collection and another
+
+ Arguments:
- When given to `apply`, will change `self` to match `end_collection`
+ * end_collection: RichCollectionBase --- A collection object with the
+ desired end state. The returned diff list will describe how to go
+ from the current collection object `self` to `end_collection`.
+ * prefix: str --- The name to use for this collection's stream in
+ the diff list. Default `'.'`.
+
+ * holding_collection: Collection | None --- A collection object used to
+ hold objects for the returned diff list. By default, a new empty
+ collection is created.
"""
changes = []
if holding_collection is None:
@@ -1101,12 +877,20 @@ class RichCollectionBase(CollectionBase):
@must_be_writable
@synchronized
- def apply(self, changes):
- """Apply changes from `diff`.
+ def apply(self, changes: ChangeList) -> None:
+ """Apply a list of changes from to this collection
+
+ This method takes a list of changes generated by
+ `RichCollectionBase.diff` and applies it to this
+ collection. Afterward, the state of this collection object will
+ match the state of `end_collection` passed to `diff`. If a change
+ conflicts with a local change, it will be saved to an alternate path
+ indicating the conflict.
- If a change conflicts with a local change, it will be saved to an
- alternate path indicating the conflict.
+ Arguments:
+ * changes: ChangeList --- The list of differences generated by
+ `RichCollectionBase.diff`.
"""
if changes:
self.set_committed(False)
@@ -1148,8 +932,8 @@ class RichCollectionBase(CollectionBase):
# else, the file is modified or already removed, in either
# case we don't want to try to remove it.
- def portable_data_hash(self):
- """Get the portable data hash for this collection's manifest."""
+ def portable_data_hash(self) -> str:
+ """Get the portable data hash for this collection's manifest"""
if self._manifest_locator and self.committed():
# If the collection is already saved on the API server, and it's committed
# then return API server's PDH response.
@@ -1159,25 +943,62 @@ class RichCollectionBase(CollectionBase):
return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
@synchronized
- def subscribe(self, callback):
+ def subscribe(self, callback: ChangeCallback) -> None:
+ """Set a notify callback for changes to this collection
+
+ Arguments:
+
+ * callback: ChangeCallback --- The callable to call each time the
+ collection is changed.
+ """
if self._callback is None:
self._callback = callback
else:
raise errors.ArgumentError("A callback is already set on this collection.")
@synchronized
- def unsubscribe(self):
+ def unsubscribe(self) -> None:
+ """Remove any notify callback set for changes to this collection"""
if self._callback is not None:
self._callback = None
@synchronized
- def notify(self, event, collection, name, item):
+ def notify(
+ self,
+ event: ChangeType,
+ collection: 'RichCollectionBase',
+ name: str,
+ item: CollectionItem,
+ ) -> None:
+ """Notify any subscribed callback about a change to this collection
+
+ .. ATTENTION:: Internal
+ This method is only meant to be used by other Collection methods.
+
+ If a callback has been registered with `RichCollectionBase.subscribe`,
+ it will be called with information about a change to this collection.
+ Then this notification will be propagated to this collection's root.
+
+ Arguments:
+
+ * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
+ the collection.
+
+ * collection: RichCollectionBase --- The collection that was modified.
+
+ * name: str --- The name of the file or stream within `collection` that
+ was modified.
+
+ * item: ArvadosFile | Subcollection --- The new contents at `name`
+ within `collection`.
+ """
if self._callback:
self._callback(event, collection, name, item)
self.root_collection().notify(event, collection, name, item)
@synchronized
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
+ """Indicate whether this collection object is equal to another"""
if other is self:
return True
if not isinstance(other, RichCollectionBase):
@@ -1191,101 +1012,97 @@ class RichCollectionBase(CollectionBase):
return False
return True
- def __ne__(self, other):
+ def __ne__(self, other: Any) -> bool:
+ """Indicate whether this collection object is not equal to another"""
return not self.__eq__(other)
@synchronized
- def flush(self):
- """Flush bufferblocks to Keep."""
+ def flush(self) -> None:
+ """Upload any pending data to Keep"""
for e in listvalues(self):
e.flush()
class Collection(RichCollectionBase):
- """Represents the root of an Arvados Collection.
-
- This class is threadsafe. The root collection object, all subcollections
- and files are protected by a single lock (i.e. each access locks the entire
- collection).
-
- Brief summary of
- useful methods:
-
- :To read an existing file:
- `c.open("myfile", "r")`
-
- :To write a new file:
- `c.open("myfile", "w")`
-
- :To determine if a file exists:
- `c.find("myfile") is not None`
-
- :To copy a file:
- `c.copy("source", "dest")`
-
- :To delete a file:
- `c.remove("myfile")`
-
- :To save to an existing collection record:
- `c.save()`
+ """Read and manipulate an Arvados collection
- :To save a new collection record:
- `c.save_new()`
-
- :To merge remote changes into this object:
- `c.update()`
-
- Must be associated with an API server Collection record (during
- initialization, or using `save_new`) to use `save` or `update`
+ This class provides a high-level interface to create, read, and update
+ Arvados collections and their contents. Refer to the Arvados Python SDK
+ cookbook for [an introduction to using the Collection class][cookbook].
+ [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
"""
- def __init__(self, manifest_locator_or_text=None,
- api_client=None,
- keep_client=None,
- num_retries=10,
- parent=None,
- apiconfig=None,
- block_manager=None,
- replication_desired=None,
- storage_classes_desired=None,
- put_threads=None):
- """Collection constructor.
-
- :manifest_locator_or_text:
- An Arvados collection UUID, portable data hash, raw manifest
- text, or (if creating an empty collection) None.
-
- :parent:
- the parent Collection, may be None.
-
- :apiconfig:
- A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
- Prefer this over supplying your own api_client and keep_client (except in testing).
- Will use default config settings if not specified.
-
- :api_client:
- The API client object to use for requests. If not specified, create one using `apiconfig`.
-
- :keep_client:
- the Keep client to use for requests. If not specified, create one using `apiconfig`.
-
- :num_retries:
- the number of retries for API and Keep requests.
+ def __init__(self, manifest_locator_or_text: Optional[str]=None,
+ api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
+ keep_client: Optional['arvados.keep.KeepClient']=None,
+ num_retries: int=10,
+ parent: Optional['Collection']=None,
+ apiconfig: Optional[Mapping[str, str]]=None,
+ block_manager: Optional['arvados.arvfile._BlockManager']=None,
+ replication_desired: Optional[int]=None,
+ storage_classes_desired: Optional[List[str]]=None,
+ put_threads: Optional[int]=None):
+ """Initialize a Collection object
- :block_manager:
- the block manager to use. If not specified, create one.
-
- :replication_desired:
- How many copies should Arvados maintain. If None, API server default
- configuration applies. If not None, this value will also be used
- for determining the number of block copies being written.
-
- :storage_classes_desired:
- A list of storage class names where to upload the data. If None,
- the keep client is expected to store the data into the cluster's
- default storage class(es).
+ Arguments:
+ * manifest_locator_or_text: str | None --- This string can contain a
+ collection manifest text, portable data hash, or UUID. When given a
+ portable data hash or UUID, this instance will load a collection
+ record from the API server. Otherwise, this instance will represent a
+ new collection without an API server record. The default value `None`
+ instantiates a new collection with an empty manifest.
+
+ * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
+ Arvados API client object this instance uses to make requests. If
+ none is given, this instance creates its own client using the
+ settings from `apiconfig` (see below). If your client instantiates
+ many Collection objects, you can help limit memory utilization by
+ calling `arvados.api.api` to construct an
+ `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
+ for every Collection.
+
+ * keep_client: arvados.keep.KeepClient | None --- The Keep client
+ object this instance uses to make requests. If none is given, this
+ instance creates its own client using its `api_client`.
+
+ * num_retries: int --- The number of times that client requests are
+ retried. Default 10.
+
+ * parent: arvados.collection.Collection | None --- The parent Collection
+ object of this instance, if any. This argument is primarily used by
+ other Collection methods; user client code shouldn't need to use it.
+
+ * apiconfig: Mapping[str, str] | None --- A mapping with entries for
+ `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
+ `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
+ Collection object constructs one from these settings. If no
+ mapping is provided, calls `arvados.config.settings` to get these
+ parameters from user configuration.
+
+ * block_manager: arvados.arvfile._BlockManager | None --- The
+ _BlockManager object used by this instance to coordinate reading
+ and writing Keep data blocks. If none is given, this instance
+ constructs its own. This argument is primarily used by other
+ Collection methods; user client code shouldn't need to use it.
+
+ * replication_desired: int | None --- This controls both the value of
+ the `replication_desired` field on API collection records saved by
+ this class, as well as the number of Keep services that the object
+ writes new data blocks to. If none is given, uses the default value
+ configured for the cluster.
+
+ * storage_classes_desired: list[str] | None --- This controls both
+ the value of the `storage_classes_desired` field on API collection
+ records saved by this class, as well as selecting which specific
+ Keep services the object writes new data blocks to. If none is
+ given, defaults to an empty list.
+
+ * put_threads: int | None --- The number of threads to run
+ simultaneously to upload data blocks to Keep. This value is used when
+ building a new `block_manager`. It is unused when a `block_manager`
+ is provided.
"""
if storage_classes_desired and type(storage_classes_desired) is not list:
@@ -1339,19 +1156,33 @@ class Collection(RichCollectionBase):
except errors.SyntaxError as e:
raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
- def storage_classes_desired(self):
+ def storage_classes_desired(self) -> list[str]:
+ """Get this collection's `storage_classes_desired` value"""
return self._storage_classes_desired or []
- def root_collection(self):
+ def root_collection(self) -> 'Collection':
return self
- def get_properties(self):
+ def get_properties(self) -> dict[str, Any]:
+ """Get this collection's properties
+
+ This method always returns a dict. If this collection object does not
+ have an associated API record, or that record does not have any
+ properties set, this method returns an empty dict.
+ """
if self._api_response and self._api_response["properties"]:
return self._api_response["properties"]
else:
return {}
- def get_trash_at(self):
+ def get_trash_at(self) -> Optional[datetime.datetime]:
+ """Get this collection's `trash_at` field
+
+ This method parses the `trash_at` field of the collection's API
+ record and returns a datetime from it. If that field is not set, or
+ this collection object does not have an associated API record,
+ returns None.
+ """
if self._api_response and self._api_response["trash_at"]:
try:
return ciso8601.parse_datetime(self._api_response["trash_at"])
@@ -1360,20 +1191,56 @@ class Collection(RichCollectionBase):
else:
return None
- def stream_name(self):
+ def stream_name(self) -> str:
return "."
- def writable(self):
+ def writable(self) -> bool:
return True
@synchronized
- def known_past_version(self, modified_at_and_portable_data_hash):
+ def known_past_version(
+ self,
+ modified_at_and_portable_data_hash: tuple[Optional[str], Optional[str]]
+ ) -> bool:
+ """Indicate whether an API record for this collection has been seen before
+
+ As this collection object loads records from the API server, it records
+ their `modified_at` and `portable_data_hash` fields. This method accepts
+ a 2-tuple with values for those fields, and returns `True` if the
+ combination was previously loaded.
+ """
return modified_at_and_portable_data_hash in self._past_versions
@synchronized
@retry_method
- def update(self, other=None, num_retries=None):
- """Merge the latest collection on the API server with the current collection."""
+ def update(
+ self,
+ other: Optional['Collection']=None,
+ num_retries: Optional[int]=None,
+ ) -> None:
+ """Merge another collection's contents into this one
+
+ This method compares the manifest of this collection instance with
+ another, then updates this instance's manifest with changes from the
+ other, renaming files to flag conflicts where necessary.
+
+ When called without any arguments, this method reloads the collection's
+ API record, and updates this instance with any changes that have
+ appeared server-side. If this instance does not have a corresponding
+ API record, this method raises `arvados.errors.ArgumentError`.
+
+ Arguments:
+
+ * other: Collection | None --- The collection whose contents should be
+ merged into this instance. When not provided, this method reloads this
+ collection's API record and constructs a Collection object from it.
+ If this instance does not have a corresponding API record, this method
+ raises `arvados.errors.ArgumentError`.
+
+ * num_retries: int | None --- The number of times to retry reloading
+ the collection's API record from the API server. If not specified,
+ uses the `num_retries` provided when this instance was constructed.
+ """
if other is None:
if self._manifest_locator is None:
@@ -1467,32 +1334,64 @@ class Collection(RichCollectionBase):
return self
def __exit__(self, exc_type, exc_value, traceback):
- """Support scoped auto-commit in a with: block."""
+ """Exit a context with this collection instance
+
+ If no exception was raised inside the context block, and this
+ collection is writable and has a corresponding API record, that
+ record will be updated to match the state of this instance at the end
+ of the block.
+ """
if exc_type is None:
if self.writable() and self._has_collection_uuid():
self.save()
self.stop_threads()
- def stop_threads(self):
+ def stop_threads(self) -> None:
+ """Stop background Keep upload/download threads"""
if self._block_manager is not None:
self._block_manager.stop_threads()
@synchronized
- def manifest_locator(self):
- """Get the manifest locator, if any.
-
- The manifest locator will be set when the collection is loaded from an
- API server record or the portable data hash of a manifest.
-
- The manifest locator will be None if the collection is newly created or
- was created directly from manifest text. The method `save_new()` will
- assign a manifest locator.
-
+ def manifest_locator(self) -> Optional[str]:
+ """Get this collection's manifest locator, if any
+
+ * If this collection instance is associated with an API record with a
+ UUID, return that.
+ * Otherwise, if this collection instance was loaded from an API record
+ by portable data hash, return that.
+ * Otherwise, return `None`.
"""
return self._manifest_locator
@synchronized
- def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
+ def clone(
+ self,
+ new_parent: Optional['Collection']=None,
+ new_name: Optional[str]=None,
+ readonly: bool=False,
+ new_config: Optional[Mapping[str, str]]=None,
+ ) -> 'Collection':
+ """Create a Collection object with the same contents as this instance
+
+ This method creates a new Collection object with contents that match
+ this instance's. The new collection will not be associated with any API
+ record.
+
+ Arguments:
+
+ * new_parent: Collection | None --- This value is passed to the new
+ Collection's constructor as the `parent` argument.
+
+ * new_name: str | None --- This value is unused.
+
+ * readonly: bool --- If this value is true, this method constructs and
+ returns a `CollectionReader`. Otherwise, it returns a mutable
+ `Collection`. Default `False`.
+
+ * new_config: Mapping[str, str] | None --- This value is passed to the
+ new Collection's constructor as `apiconfig`. If no value is provided,
+ defaults to the configuration passed to this instance's constructor.
+ """
if new_config is None:
new_config = self._config
if readonly:
@@ -1504,31 +1403,31 @@ class Collection(RichCollectionBase):
return newcollection
@synchronized
- def api_response(self):
- """Returns information about this Collection fetched from the API server.
-
- If the Collection exists in Keep but not the API server, currently
- returns None. Future versions may provide a synthetic response.
+ def api_response(self) -> Optional[Dict[str, Any]]:
+ """Get this instance's associated API record
+ If this Collection instance has an associated API record, return it.
+ Otherwise, return `None`.
"""
return self._api_response
- def find_or_create(self, path, create_type):
- """See `RichCollectionBase.find_or_create`"""
+ def find_or_create(
+ self,
+ path: str,
+ create_type: CreateType,
+ ) -> CollectionItem:
if path == ".":
return self
else:
return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
- def find(self, path):
- """See `RichCollectionBase.find`"""
+ def find(self, path: str) -> CollectionItem:
if path == ".":
return self
else:
return super(Collection, self).find(path[2:] if path.startswith("./") else path)
- def remove(self, path, recursive=False):
- """See `RichCollectionBase.remove`"""
+ def remove(self, path: str, recursive: bool=False) -> None:
if path == ".":
raise errors.ArgumentError("Cannot remove '.'")
else:
@@ -1537,49 +1436,52 @@ class Collection(RichCollectionBase):
@must_be_writable
@synchronized
@retry_method
- def save(self,
- properties=None,
- storage_classes=None,
- trash_at=None,
- merge=True,
- num_retries=None,
- preserve_version=False):
- """Save collection to an existing collection record.
-
- Commit pending buffer blocks to Keep, merge with remote record (if
- merge=True, the default), and update the collection record. Returns
- the current manifest text.
-
- Will raise AssertionError if not associated with a collection record on
- the API server. If you want to save a manifest to Keep only, see
- `save_new()`.
-
- :properties:
- Additional properties of collection. This value will replace any existing
- properties of collection.
-
- :storage_classes:
- Specify desirable storage classes to be used when writing data to Keep.
-
- :trash_at:
- A collection is *expiring* when it has a *trash_at* time in the future.
- An expiring collection can be accessed as normal,
- but is scheduled to be trashed automatically at the *trash_at* time.
-
- :merge:
- Update and merge remote changes before saving. Otherwise, any
- remote changes will be ignored and overwritten.
-
- :num_retries:
- Retry count on API calls (if None, use the collection default)
-
- :preserve_version:
- If True, indicate that the collection content being saved right now
- should be preserved in a version snapshot if the collection record is
- updated in the future. Requires that the API server has
- Collections.CollectionVersioning enabled, if not, setting this will
- raise an exception.
+ def save(
+ self,
+ properties: Optional[Properties]=None,
+ storage_classes: Optional[StorageClasses]=None,
+ trash_at: Optional[datetime.datetime]=None,
+ merge: bool=True,
+ num_retries: Optional[int]=None,
+ preserve_version: bool=False,
+ ) -> str:
+ """Save collection to an existing API record
+
+ This method updates the instance's corresponding API record to match
+ the instance's state. If this instance does not have a corresponding API
+ record yet, raises `AssertionError`. (To create a new API record, use
+ `Collection.save_new`.) This method returns the saved collection
+ manifest.
+ Arguments:
+
+ * properties: dict[str, Any] | None --- If provided, the API record will
+ be updated with these properties. Note this will completely replace
+ any existing properties.
+
+ * storage_classes: list[str] | None --- If provided, the API record will
+ be updated with this value in the `storage_classes_desired` field.
+ This value will also be saved on the instance and used for any
+ changes that follow.
+
+ * trash_at: datetime.datetime | None --- If provided, the API record
+ will be updated with this value in the `trash_at` field.
+
+ * merge: bool --- If `True` (the default), this method will first
+ reload this collection's API record, and merge any new contents into
+ this instance before saving changes. See `Collection.update` for
+ details.
+
+ * num_retries: int | None --- The number of times to retry reloading
+ the collection's API record from the API server. If not specified,
+ uses the `num_retries` provided when this instance was constructed.
+
+ * preserve_version: bool --- This value will be passed to directly
+ to the underlying API call. If `True`, the Arvados API will
+ preserve the versions of this collection both immediately before
+ and after the update. If `True` when the API server is not
+ configured with collection versioning, this method raises
+ `arvados.errors.ArgumentError`.
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
@@ -1643,60 +1545,66 @@ class Collection(RichCollectionBase):
@must_be_writable
@synchronized
@retry_method
- def save_new(self, name=None,
- create_collection_record=True,
- owner_uuid=None,
- properties=None,
- storage_classes=None,
- trash_at=None,
- ensure_unique_name=False,
- num_retries=None,
- preserve_version=False):
- """Save collection to a new collection record.
-
- Commit pending buffer blocks to Keep and, when create_collection_record
- is True (default), create a new collection record. After creating a
- new collection record, this Collection object will be associated with
- the new record used by `save()`. Returns the current manifest text.
-
- :name:
- The collection name.
-
- :create_collection_record:
- If True, create a collection record on the API server.
- If False, only commit blocks to Keep and return the manifest text.
-
- :owner_uuid:
- the user, or project uuid that will own this collection.
- If None, defaults to the current user.
-
- :properties:
- Additional properties of collection. This value will replace any existing
- properties of collection.
-
- :storage_classes:
- Specify desirable storage classes to be used when writing data to Keep.
-
- :trash_at:
- A collection is *expiring* when it has a *trash_at* time in the future.
- An expiring collection can be accessed as normal,
- but is scheduled to be trashed automatically at the *trash_at* time.
-
- :ensure_unique_name:
- If True, ask the API server to rename the collection
- if it conflicts with a collection with the same name and owner. If
- False, a name conflict will result in an error.
-
- :num_retries:
- Retry count on API calls (if None, use the collection default)
-
- :preserve_version:
- If True, indicate that the collection content being saved right now
- should be preserved in a version snapshot if the collection record is
- updated in the future. Requires that the API server has
- Collections.CollectionVersioning enabled, if not, setting this will
- raise an exception.
+ def save_new(
+ self,
+ name: Optional[str]=None,
+ create_collection_record: bool=True,
+ owner_uuid: Optional[str]=None,
+ properties: Optional[Properties]=None,
+ storage_classes: Optional[StorageClasses]=None,
+ trash_at: Optional[datetime.datetime]=None,
+ ensure_unique_name: bool=False,
+ num_retries: Optional[int]=None,
+ preserve_version: bool=False,
+ ):
+ """Save collection to a new API record
+
+ This method finishes uploading new data blocks and (optionally)
+ creates a new API collection record with the provided data. If a new
+ record is created, this instance becomes associated with that record
+ for future updates like `save()`. This method returns the saved
+ collection manifest.
+
+ Arguments:
+
+ * name: str | None --- The `name` field to use on the new collection
+ record. If not specified, a generic default name is generated.
+
+ * create_collection_record: bool --- If `True` (the default), creates a
+ collection record on the API server. If `False`, the method finishes
+ all data uploads and only returns the resulting collection manifest
+ without sending it to the API server.
+
+ * owner_uuid: str | None --- The `owner_uuid` field to use on the
+ new collection record.
+
+ * properties: dict[str, Any] | None --- The `properties` field to use on
+ the new collection record.
+
+ * storage_classes: list[str] | None --- The
+ `storage_classes_desired` field to use on the new collection record.
+
+ * trash_at: datetime.datetime | None --- The `trash_at` field to use
+ on the new collection record.
+
+ * ensure_unique_name: bool --- This value is passed directly to the
+ Arvados API when creating the collection record. If `True`, the API
+ server may modify the submitted `name` to ensure the collection's
+ `name`+`owner_uuid` combination is unique. If `False` (the default),
+ if a collection already exists with this same `name`+`owner_uuid`
+ combination, creating a collection record will raise a validation
+ error.
+ * num_retries: int | None --- The number of times to retry reloading
+ the collection's API record from the API server. If not specified,
+ uses the `num_retries` provided when this instance was constructed.
+
+ * preserve_version: bool --- This value will be passed to directly
+ to the underlying API call. If `True`, the Arvados API will
+ preserve the versions of this collection both immediately before
+ and after the update. If `True` when the API server is not
+ configured with collection versioning, this method raises
+ `arvados.errors.ArgumentError`.
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
@@ -1834,17 +1742,24 @@ class Collection(RichCollectionBase):
self.set_committed(True)
@synchronized
- def notify(self, event, collection, name, item):
+ def notify(
+ self,
+ event: ChangeType,
+ collection: 'RichCollectionBase',
+ name: str,
+ item: CollectionItem,
+ ) -> None:
if self._callback:
self._callback(event, collection, name, item)
class Subcollection(RichCollectionBase):
- """This is a subdirectory within a collection that doesn't have its own API
- server record.
-
- Subcollection locking falls under the umbrella lock of its root collection.
+ """Read and manipulate a stream/directory within an Arvados collection
+ This class represents a single stream (like a directory) within an Arvados
+ `Collection`. It is returned by `Collection.find` and provides the same API.
+ Operations that work on the API collection record propagate to the parent
+ `Collection` object.
"""
def __init__(self, parent, name):
@@ -1854,10 +1769,10 @@ class Subcollection(RichCollectionBase):
self.name = name
self.num_retries = parent.num_retries
- def root_collection(self):
+ def root_collection(self) -> 'Collection':
return self.parent.root_collection()
- def writable(self):
+ def writable(self) -> bool:
return self.root_collection().writable()
def _my_api(self):
@@ -1869,11 +1784,15 @@ class Subcollection(RichCollectionBase):
def _my_block_manager(self):
return self.root_collection()._my_block_manager()
- def stream_name(self):
+ def stream_name(self) -> str:
return os.path.join(self.parent.stream_name(), self.name)
@synchronized
- def clone(self, new_parent, new_name):
+ def clone(
+ self,
+ new_parent: Optional['Collection']=None,
+ new_name: Optional[str]=None,
+ ) -> 'Subcollection':
c = Subcollection(new_parent, new_name)
c._clonefrom(self)
return c
@@ -1900,11 +1819,11 @@ class Subcollection(RichCollectionBase):
class CollectionReader(Collection):
- """A read-only collection object.
-
- Initialize from a collection UUID or portable data hash, or raw
- manifest text. See `Collection` constructor for detailed options.
+ """Read-only `Collection` subclass
+ This class will never create or update any API collection records. You can
+ use this class for additional code safety when you only need to read
+ existing collections.
"""
def __init__(self, manifest_locator_or_text, *args, **kwargs):
self._in_init = True
@@ -1918,7 +1837,7 @@ class CollectionReader(Collection):
# all_streams() and all_files()
self._streams = None
- def writable(self):
+ def writable(self) -> bool:
return self._in_init
def _populate_streams(orig_func):
@@ -1935,16 +1854,10 @@ class CollectionReader(Collection):
return orig_func(self, *args, **kwargs)
return populate_streams_wrapper
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def normalize(self):
- """Normalize the streams returned by `all_streams`.
-
- This method is kept for backwards compatability and only affects the
- behavior of `all_streams()` and `all_files()`
-
- """
-
- # Rearrange streams
+ """Normalize the streams returned by `all_streams`"""
streams = {}
for s in self.all_streams():
for f in s.all_files():
@@ -1971,3 +1884,423 @@ class CollectionReader(Collection):
for s in self.all_streams():
for f in s.all_files():
yield f
+
+
+class CollectionWriter(CollectionBase):
+ """Create a new collection from scratch
+
+ .. WARNING:: Deprecated
+ This class is deprecated. Prefer `arvados.collection.Collection`
+ instead.
+ """
+
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
+ def __init__(self, api_client=None, num_retries=0, replication=None):
+ """Instantiate a CollectionWriter.
+
+ CollectionWriter lets you build a new Arvados Collection from scratch.
+ Write files to it. The CollectionWriter will upload data to Keep as
+ appropriate, and provide you with the Collection manifest text when
+ you're finished.
+
+ Arguments:
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ * replication: The number of copies of each block to store.
+ If this argument is None or not supplied, replication is
+ the server-provided default if available, otherwise 2.
+ """
+ self._api_client = api_client
+ self.num_retries = num_retries
+ self.replication = (2 if replication is None else replication)
+ self._keep_client = None
+ self._data_buffer = []
+ self._data_buffer_len = 0
+ self._current_stream_files = []
+ self._current_stream_length = 0
+ self._current_stream_locators = []
+ self._current_stream_name = '.'
+ self._current_file_name = None
+ self._current_file_pos = 0
+ self._finished_streams = []
+ self._close_file = None
+ self._queued_file = None
+ self._queued_dirents = deque()
+ self._queued_trees = deque()
+ self._last_open = None
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.finish()
+
+ def do_queued_work(self):
+ # The work queue consists of three pieces:
+ # * _queued_file: The file object we're currently writing to the
+ # Collection.
+ # * _queued_dirents: Entries under the current directory
+ # (_queued_trees[0]) that we want to write or recurse through.
+ # This may contain files from subdirectories if
+ # max_manifest_depth == 0 for this directory.
+ # * _queued_trees: Directories that should be written as separate
+ # streams to the Collection.
+ # This function handles the smallest piece of work currently queued
+ # (current file, then current directory, then next directory) until
+ # no work remains. The _work_THING methods each do a unit of work on
+ # THING. _queue_THING methods add a THING to the work queue.
+ while True:
+ if self._queued_file:
+ self._work_file()
+ elif self._queued_dirents:
+ self._work_dirents()
+ elif self._queued_trees:
+ self._work_trees()
+ else:
+ break
+
+ def _work_file(self):
+ while True:
+ buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
+ if not buf:
+ break
+ self.write(buf)
+ self.finish_current_file()
+ if self._close_file:
+ self._queued_file.close()
+ self._close_file = None
+ self._queued_file = None
+
+ def _work_dirents(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ if stream_name != self.current_stream_name():
+ self.start_new_stream(stream_name)
+ while self._queued_dirents:
+ dirent = self._queued_dirents.popleft()
+ target = os.path.join(path, dirent)
+ if os.path.isdir(target):
+ self._queue_tree(target,
+ os.path.join(stream_name, dirent),
+ max_manifest_depth - 1)
+ else:
+ self._queue_file(target, dirent)
+ break
+ if not self._queued_dirents:
+ self._queued_trees.popleft()
+
+ def _work_trees(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ d = arvados.util.listdir_recursive(
+ path, max_depth = (None if max_manifest_depth == 0 else 0))
+ if d:
+ self._queue_dirents(stream_name, d)
+ else:
+ self._queued_trees.popleft()
+
+ def _queue_file(self, source, filename=None):
+ assert (self._queued_file is None), "tried to queue more than one file"
+ if not hasattr(source, 'read'):
+ source = open(source, 'rb')
+ self._close_file = True
+ else:
+ self._close_file = False
+ if filename is None:
+ filename = os.path.basename(source.name)
+ self.start_new_file(filename)
+ self._queued_file = source
+
+ def _queue_dirents(self, stream_name, dirents):
+ assert (not self._queued_dirents), "tried to queue more than one tree"
+ self._queued_dirents = deque(sorted(dirents))
+
+ def _queue_tree(self, path, stream_name, max_manifest_depth):
+ self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+ def write_file(self, source, filename=None):
+ self._queue_file(source, filename)
+ self.do_queued_work()
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ self._queue_tree(path, stream_name, max_manifest_depth)
+ self.do_queued_work()
+
+ def write(self, newdata):
+ if isinstance(newdata, bytes):
+ pass
+ elif isinstance(newdata, str):
+ newdata = newdata.encode()
+ elif hasattr(newdata, '__iter__'):
+ for s in newdata:
+ self.write(s)
+ return
+ self._data_buffer.append(newdata)
+ self._data_buffer_len += len(newdata)
+ self._current_stream_length += len(newdata)
+ while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
+ self.flush_data()
+
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to write to the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object you can write to add it to the
+ Collection.
+
+ You may only have one file object from the Collection open at a time,
+ so be sure to close the object when you're done. Using the object in
+ a with statement makes that easy::
+
+ with cwriter.open('./doc/page1.txt') as outfile:
+ outfile.write(page1_data)
+ with cwriter.open('./doc/page2.txt') as outfile:
+ outfile.write(page2_data)
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ if self._last_open and not self._last_open.closed:
+ raise errors.AssertionError(
+ u"can't open '{}' when '{}' is still open".format(
+ filename, self._last_open.name))
+ if streampath != self.current_stream_name():
+ self.start_new_stream(streampath)
+ self.set_current_file_name(filename)
+ self._last_open = _WriterFile(self, filename)
+ return self._last_open
+
+ def flush_data(self):
+ data_buffer = b''.join(self._data_buffer)
+ if data_buffer:
+ self._current_stream_locators.append(
+ self._my_keep().put(
+ data_buffer[0:config.KEEP_BLOCK_SIZE],
+ copies=self.replication))
+ self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
+ self._data_buffer_len = len(self._data_buffer[0])
+
+ def start_new_file(self, newfilename=None):
+ self.finish_current_file()
+ self.set_current_file_name(newfilename)
+
+ def set_current_file_name(self, newfilename):
+ if re.search(r'[\t\n]', newfilename):
+ raise errors.AssertionError(
+ "Manifest filenames cannot contain whitespace: %s" %
+ newfilename)
+ elif re.search(r'\x00', newfilename):
+ raise errors.AssertionError(
+ "Manifest filenames cannot contain NUL characters: %s" %
+ newfilename)
+ self._current_file_name = newfilename
+
+ def current_file_name(self):
+ return self._current_file_name
+
+ def finish_current_file(self):
+ if self._current_file_name is None:
+ if self._current_file_pos == self._current_stream_length:
+ return
+ raise errors.AssertionError(
+ "Cannot finish an unnamed file " +
+ "(%d bytes at offset %d in '%s' stream)" %
+ (self._current_stream_length - self._current_file_pos,
+ self._current_file_pos,
+ self._current_stream_name))
+ self._current_stream_files.append([
+ self._current_file_pos,
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name])
+ self._current_file_pos = self._current_stream_length
+ self._current_file_name = None
+
+ def start_new_stream(self, newstreamname='.'):
+ self.finish_current_stream()
+ self.set_current_stream_name(newstreamname)
+
+ def set_current_stream_name(self, newstreamname):
+ if re.search(r'[\t\n]', newstreamname):
+ raise errors.AssertionError(
+ "Manifest stream names cannot contain whitespace: '%s'" %
+ (newstreamname))
+ self._current_stream_name = '.' if newstreamname=='' else newstreamname
+
+ def current_stream_name(self):
+ return self._current_stream_name
+
+ def finish_current_stream(self):
+ self.finish_current_file()
+ self.flush_data()
+ if not self._current_stream_files:
+ pass
+ elif self._current_stream_name is None:
+ raise errors.AssertionError(
+ "Cannot finish an unnamed stream (%d bytes in %d files)" %
+ (self._current_stream_length, len(self._current_stream_files)))
+ else:
+ if not self._current_stream_locators:
+ self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+ self._finished_streams.append([self._current_stream_name,
+ self._current_stream_locators,
+ self._current_stream_files])
+ self._current_stream_files = []
+ self._current_stream_length = 0
+ self._current_stream_locators = []
+ self._current_stream_name = None
+ self._current_file_pos = 0
+ self._current_file_name = None
+
+ def finish(self):
+ """Store the manifest in Keep and return its locator.
+
+ This is useful for storing manifest fragments (task outputs)
+ temporarily in Keep during a Crunch job.
+
+ In other cases you should make a collection instead, by
+ sending manifest_text() to the API server's "create
+ collection" endpoint.
+ """
+ return self._my_keep().put(self.manifest_text().encode(),
+ copies=self.replication)
+
+ def portable_data_hash(self):
+ stripped = self.stripped_manifest().encode()
+ return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
+
+ def manifest_text(self):
+ self.finish_current_stream()
+ manifest = ''
+
+ for stream in self._finished_streams:
+ if not re.search(r'^\.(/.*)?$', stream[0]):
+ manifest += './'
+ manifest += stream[0].replace(' ', '\\040')
+ manifest += ' ' + ' '.join(stream[1])
+ manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
+ manifest += "\n"
+
+ return manifest
+
+ def data_locators(self):
+ ret = []
+ for name, locators, files in self._finished_streams:
+ ret += locators
+ return ret
+
+ def save_new(self, name=None):
+ return self._api_client.collections().create(
+ ensure_unique_name=True,
+ body={
+ 'name': name,
+ 'manifest_text': self.manifest_text(),
+ }).execute(num_retries=self.num_retries)
+
+
+class ResumableCollectionWriter(CollectionWriter):
+ """CollectionWriter that can serialize internal state to disk
+
+ .. WARNING:: Deprecated
+ This class is deprecated. Prefer `arvados.collection.Collection`
+ instead.
+ """
+
+ STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+ '_current_stream_locators', '_current_stream_name',
+ '_current_file_name', '_current_file_pos', '_close_file',
+ '_data_buffer', '_dependencies', '_finished_streams',
+ '_queued_dirents', '_queued_trees']
+
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
+ def __init__(self, api_client=None, **kwargs):
+ self._dependencies = {}
+ super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
+
+ @classmethod
+ def from_state(cls, state, *init_args, **init_kwargs):
+ # Try to build a new writer from scratch with the given state.
+ # If the state is not suitable to resume (because files have changed,
+ # been deleted, aren't predictable, etc.), raise a
+ # StaleWriterStateError. Otherwise, return the initialized writer.
+ # The caller is responsible for calling writer.do_queued_work()
+ # appropriately after it's returned.
+ writer = cls(*init_args, **init_kwargs)
+ for attr_name in cls.STATE_PROPS:
+ attr_value = state[attr_name]
+ attr_class = getattr(writer, attr_name).__class__
+ # Coerce the value into the same type as the initial value, if
+ # needed.
+ if attr_class not in (type(None), attr_value.__class__):
+ attr_value = attr_class(attr_value)
+ setattr(writer, attr_name, attr_value)
+ # Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
+ writer.check_dependencies()
+ if state['_current_file'] is not None:
+ path, pos = state['_current_file']
+ try:
+ writer._queued_file = open(path, 'rb')
+ writer._queued_file.seek(pos)
+ except IOError as error:
+ raise errors.StaleWriterStateError(
+ u"failed to reopen active file {}: {}".format(path, error))
+ return writer
+
+ def check_dependencies(self):
+ for path, orig_stat in listitems(self._dependencies):
+ if not S_ISREG(orig_stat[ST_MODE]):
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
+ try:
+ now_stat = tuple(os.stat(path))
+ except OSError as error:
+ raise errors.StaleWriterStateError(
+ u"failed to stat {}: {}".format(path, error))
+ if ((not S_ISREG(now_stat[ST_MODE])) or
+ (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+ (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
+
+ def dump_state(self, copy_func=lambda x: x):
+ state = {attr: copy_func(getattr(self, attr))
+ for attr in self.STATE_PROPS}
+ if self._queued_file is None:
+ state['_current_file'] = None
+ else:
+ state['_current_file'] = (os.path.realpath(self._queued_file.name),
+ self._queued_file.tell())
+ return state
+
+ def _queue_file(self, source, filename=None):
+ try:
+ src_path = os.path.realpath(source)
+ except Exception:
+ raise errors.AssertionError(u"{} not a file path".format(source))
+ try:
+ path_stat = os.stat(src_path)
+ except OSError as stat_error:
+ path_stat = None
+ super(ResumableCollectionWriter, self)._queue_file(source, filename)
+ fd_stat = os.fstat(self._queued_file.fileno())
+ if not S_ISREG(fd_stat.st_mode):
+ # We won't be able to resume from this cache anyway, so don't
+ # worry about further checks.
+ self._dependencies[source] = tuple(fd_stat)
+ elif path_stat is None:
+ raise errors.AssertionError(
+ u"could not stat {}: {}".format(source, stat_error))
+ elif path_stat.st_ino != fd_stat.st_ino:
+ raise errors.AssertionError(
+ u"{} changed between open and stat calls".format(source))
+ else:
+ self._dependencies[src_path] = tuple(fd_stat)
+
+ def write(self, data):
+ if self._queued_file is None:
+ raise errors.AssertionError(
+ "resumable writer can't accept unsourced data")
+ return super(ResumableCollectionWriter, self).write(data)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list