[ARVADOS] created: 1.1.4-303-gef66d2f
Git user
git at public.curoverse.com
Tue May 22 20:46:52 EDT 2018
at ef66d2fec4bddabfe678f58e5f21a2a8d9471b6f (commit)
commit ef66d2fec4bddabfe678f58e5f21a2a8d9471b6f
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue May 22 20:45:58 2018 -0400
11162: Add progress for http data download.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py
index de7bfe6..064aa8d 100644
--- a/sdk/cwl/arvados_cwl/http.py
+++ b/sdk/cwl/arvados_cwl/http.py
@@ -6,6 +6,9 @@ import re
import arvados
import arvados.collection
import urlparse
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
def my_formatdate(dt):
return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
@@ -35,7 +38,7 @@ def fresh_cache(url, properties):
def remember_headers(url, properties, headers):
properties.setdefault(url, {})
- for h in ("Cache-Control", "ETag", "Expires", "Date"):
+ for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
if h in headers:
properties[url][h] = headers[h]
if "Date" not in headers:
@@ -74,17 +77,31 @@ def http_to_keep(api, project_uuid, url):
req = requests.get(url, stream=True)
if req.status_code != 200:
- raise Exception("Got status %s" % req.status_code)
+ raise Exception("Failed to download '%s' got status %s " % (req.status_code, url))
remember_headers(url, properties, req.headers)
+ logger.info("Downloading %s (%s bytes)", url, properties[url]["Content-Length"])
+
c = arvados.collection.Collection()
+ count = 0
+ start = time.time()
+ checkpoint = start
with c.open(name, "w") as f:
- for chunk in req.iter_content(chunk_size=128):
+ for chunk in req.iter_content(chunk_size=1024):
+ count += len(chunk)
f.write(chunk)
-
- c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid)
+ now = time.time()
+ if (now - checkpoint) > 20:
+ bps = (float(count)/float(now - start))
+ logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+ float(count * 100) / float(properties[url]["Content-Length"]),
+ bps/(1024*1024),
+ (int(properties[url]["Content-Length"])-count)/bps)
+ checkpoint = now
+
+ c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
commit fca26aaba220b8215f44832c5fac46761338c14a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue May 22 16:53:16 2018 -0400
11162: Add --submit-request-uuid
Allows container request to be created separately from initialization
that occurs prior to actually committing the container to run.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 4eda886..5c60f7d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -401,6 +401,9 @@ class ArvCwlRunner(object):
if self.intermediate_output_ttl < 0:
raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+ if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+ raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
@@ -706,6 +709,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
default=None)
+ parser.add_argument("--submit-request-uuid", type=str,
+ default=None,
+ help="Update and commit supplied container request instead of creating a new one (containers API only).")
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4e7811d..0bec692 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -51,7 +51,6 @@ class ArvadosContainer(object):
container_request = {
"command": self.command_line,
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
@@ -61,6 +60,9 @@ class ArvadosContainer(object):
}
runtime_constraints = {}
+ if self.arvrunner.project_uuid:
+ container_request["owner_uuid"] = self.arvrunner.project_uuid
+
if self.arvrunner.secret_store.has_secret(self.command_line):
raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
@@ -251,9 +253,15 @@ class ArvadosContainer(object):
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- response = self.arvrunner.api.container_requests().create(
- body=container_request
- ).execute(num_retries=self.arvrunner.num_retries)
+ if kwargs.get("submit_request_uuid"):
+ response = self.arvrunner.api.container_requests().update(
+ uuid=kwargs["submit_request_uuid"],
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
@@ -343,7 +351,6 @@ class RunnerContainer(Runner):
self.job_order[param] = {"$include": mnt}
container_req = {
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
@@ -442,11 +449,18 @@ class RunnerContainer(Runner):
def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
job_spec = self.arvados_job_spec(**kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+ if self.arvrunner.project_uuid:
+ job_spec["owner_uuid"] = self.arvrunner.project_uuid
- response = self.arvrunner.api.container_requests().create(
- body=job_spec
- ).execute(num_retries=self.arvrunner.num_retries)
+ if kwargs.get("submit_request_uuid"):
+ response = self.arvrunner.api.container_requests().update(
+ uuid=kwargs["submit_request_uuid"],
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
commit 25c6e5a357bad34fb9744c58ba822646fe26690b
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon May 14 12:02:51 2018 -0400
11162: Handle missing git
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_version.py b/sdk/cwl/arvados_version.py
index a24d53d..13e6d36 100644
--- a/sdk/cwl/arvados_version.py
+++ b/sdk/cwl/arvados_version.py
@@ -34,7 +34,7 @@ def get_version(setup_dir, module):
else:
try:
save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
- except subprocess.CalledProcessError:
+ except (subprocess.CalledProcessError, OSError):
pass
return read_version(setup_dir, module)
commit c169bd1d62d97d95547b4dd4235b2d4efa3a4819
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri May 11 16:36:54 2018 -0400
11162: Support public http and https file references
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index d509f40..4eda886 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -37,7 +37,7 @@ import arvados.commands._util as arv_cmd
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py
new file mode 100644
index 0000000..de7bfe6
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/http.py
@@ -0,0 +1,91 @@
+import requests
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urlparse
+
+def my_formatdate(dt):
+ return email.utils.formatdate(timeval=time.mktime(now.timetuple()), localtime=False, usegmt=True)
+
+def my_parsedate(text):
+ return datetime.datetime(*email.utils.parsedate(text)[:6])
+
+def fresh_cache(url, properties):
+ pr = properties[url]
+ expires = None
+
+ if "Cache-Control" in pr:
+ if re.match(r"immutable", pr["Cache-Control"]):
+ return True
+
+ g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+ if g:
+ expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+ if expires is None and "Expires" in pr:
+ expires = my_parsedate(pr["Expires"])
+
+ if not expires:
+ return False
+
+ return (datetime.datetime.utcnow() < expires)
+
+def remember_headers(url, properties, headers):
+ properties.setdefault(url, {})
+ for h in ("Cache-Control", "ETag", "Expires", "Date"):
+ if h in headers:
+ properties[url][h] = headers[h]
+ if "Date" not in headers:
+ properties[url]["Date"] = my_formatdate(datetime.datetime.utcnow())
+
+
+def changed(url, properties):
+ req = requests.head(url)
+ remember_headers(url, properties, req.headers)
+
+ if req.status_code != 200:
+ raise Exception("Got status %s" % req.status_code)
+
+ pr = properties[url]
+ if "ETag" in pr and "ETag" in req.headers:
+ if pr["ETag"] == req.headers["ETag"]:
+ return False
+ return True
+
+def http_to_keep(api, project_uuid, url):
+ r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+ name = urlparse.urlparse(url).path.split("/")[-1]
+
+ for item in r["items"]:
+ properties = item["properties"]
+ if fresh_cache(url, properties):
+ # Do nothing
+ return "keep:%s/%s" % (item["portable_data_hash"], name)
+
+ if not changed(url, properties):
+ # ETag didn't change, same content, just update headers
+ api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+ return "keep:%s/%s" % (item["portable_data_hash"], name)
+
+ properties = {}
+ req = requests.get(url, stream=True)
+
+ if req.status_code != 200:
+ raise Exception("Got status %s" % req.status_code)
+
+ remember_headers(url, properties, req.headers)
+
+ c = arvados.collection.Collection()
+
+ with c.open(name, "w") as f:
+ for chunk in req.iter_content(chunk_size=128):
+ f.write(chunk)
+
+ c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid)
+
+ api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+ return "keep:%s/%s" % (c.portable_data_hash(), name)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 6fedb12..bd4b528 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -16,6 +16,8 @@ from schema_salad.sourceline import SourceLine
from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
from cwltool.workflow import WorkflowException
+from .http import http_to_keep
+
logger = logging.getLogger('arvados.cwl-runner')
def trim_listing(obj):
@@ -81,6 +83,10 @@ class ArvPathMapper(PathMapper):
raise WorkflowException("File literal '%s' is missing `contents`" % src)
if srcobj["class"] == "Directory" and "listing" not in srcobj:
raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ elif src.startswith("http:") or src.startswith("https:"):
+ keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 4df89ee..05ff8a2 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -35,7 +35,7 @@ setup(name='arvados-cwl-runner',
install_requires=[
'cwltool==1.0.20180522135731',
'schema-salad==2.7.20180501211602',
- 'typing==3.5.3.0',
+ 'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
'arvados-python-client>=1.1.4.20180507184611',
'setuptools',
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list