Skip to content

Commit

Permalink
Merge pull request bird-house#43 in OGC/testbed14-twitcher from deimo…
Browse files Browse the repository at this point in the history
…s_opensearch_support to dynamic-wps-processes

* commit '2ff9f62cedc75876405cdc3158da04bf4ca6b9ad':
  Add status message explaining the remote ADES choice
  Fix type in comment
  Add comments to improve fct understanding
  Add support for deimos opensearch which return links in atom xml format only
  • Loading branch information
dbyrns committed Nov 29, 2018
2 parents 1004f19 + 2ff9f62 commit ad9e9bf
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
42 changes: 37 additions & 5 deletions twitcher/processes/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pyramid.httpexceptions import HTTPGatewayTimeout, HTTPOk
from pyramid.settings import asbool
from six.moves.urllib.parse import urlparse, parse_qsl
from typing import Iterable, Dict, Tuple, List, Deque
from typing import Iterable, Dict, Tuple, List, Deque, AnyStr
from twitcher.processes.sources import fetch_data_sources
from twitcher.processes.constants import WPS_LITERAL
from twitcher.utils import get_any_id
Expand Down Expand Up @@ -121,6 +121,14 @@ def is_eoimage_parameter(param):
new_input = deepcopy(queue[0])
new_input.data = replace_with_opensearch_scheme(link)
eoimages_queue.append(new_input)
if len(eoimages_queue) >= max_occurs:
break
if len(eoimages_queue) < queue[0].min_occurs:
message = "Could not find enough images ({}/{}) matching accepted mimetype ({})"
message = message.format(len(eoimages_queue),
queue[0].min_occurs,
", ".join(mime_types))
raise ValueError(message)
new_inputs[input_id] = eoimages_queue

return new_inputs
Expand Down Expand Up @@ -223,6 +231,21 @@ def _prepare_query_url(self, template_url, params):

return base_url, query_params

def _fetch_datatsets_from_alternates_links(self, alternate_links):
# Try loading from atom alternate link
for link in alternate_links:
if link["type"] == "application/atom+xml":
r = requests.get(link["href"])
r.raise_for_status()

et = lxml.etree.fromstring(r.content)
xpath = "//*[local-name() = 'entry']/*[local-name() = 'link']"

# noinspection PyProtectedMember
links = et.xpath(xpath) # type: List[lxml.etree._Element]
return [link.attrib for link in links]
return []

# noinspection PyMethodMayBeStatic
def requests_get_retry(self, *args, **kwargs):
"""Retry a requests.get call
Expand Down Expand Up @@ -270,20 +293,27 @@ def _query_features_paginated(self, params):
start_index += n_received_features

def query_datasets(self, params, accept_schemes, accept_mime_types):
# type: (Dict, Tuple, List) -> Iterable
# type: (Dict, Tuple, List) -> Iterable[AnyStr]
"""
Loop on every opensearch result feature and yield url mathching required mimetype and scheme.
Log a warning if a feature cannot yield a valid url (either no compatible mimetype or scheme)
:param params: query parameters
:param accept_schemes: only return links of this scheme
:param accept_mime_types: list of accepted mime types, ordered by preference
:raise KeyError: If the feature doesn't contain a json data section or an atom alternative link
"""
if params is None:
params = {}

for feature, url in self._query_features_paginated(params):
try:
data_links = feature["properties"]["links"]["data"]
try:
data_links = feature["properties"]["links"]["data"]
except KeyError:
# Try loading from atom alternate link
data_links = self._fetch_datatsets_from_alternates_links(
feature["properties"]["links"]["alternates"])
data_links_mime_types = [d["type"] for d in data_links]
except KeyError:
LOGGER.exception("Badly formatted json at: {}".format(url))
Expand All @@ -300,7 +330,9 @@ def query_datasets(self, params, accept_schemes, accept_mime_types):
message = "Could not match any accepted mimetype ({}) to received mimetype ({})"
message = message.format(", ".join(accept_mime_types),
", ".join(data_links_mime_types))
raise ValueError(message)
# Do not raise an error right now, just loop until we reach the number of inputs we want
# Raise only if that number isn't reach
LOGGER.warning(message)


def get_additional_parameters(input_data):
Expand Down
11 changes: 10 additions & 1 deletion twitcher/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ def get_job_process_definition(self, jobname, joborder):
eodata_inputs = opensearch.get_eo_images_ids_from_payload(step_payload)

data_url = "" # data_source will be set to the default ADES if no EOImages

if eodata_inputs:
step_payload = opensearch.alter_payload_after_query(step_payload)
value = joborder[eodata_inputs[0]]
Expand All @@ -1150,14 +1151,22 @@ def get_job_process_definition(self, jobname, joborder):
value = value[0]

data_url = value['location']
data_source_reason = '(ADES based on {0})'.format(data_url)
else:
data_source_reason = '(No EOImage -> Default ADES)'

data_source = get_data_source_from_url(data_url)

# Progress made with steps presumes that they are done sequentially and have the same progress weight
start_step_progress = self.map_step_progress(len(self.step_launched), max(1, len(self.step_packages)))
end_step_progress = self.map_step_progress(len(self.step_launched) + 1, max(1, len(self.step_packages)))
url = retrieve_data_source_url(data_source)

self.update_status("Launching {type} {name} on {src}.".format(type=jobtype, name=jobname, src=data_source),
self.update_status("Launching {type} {name} on {src}.".format(
type=jobtype,
name=jobname,
src=data_source,
reason=data_source_reason),
start_step_progress, ts.STATUS_RUNNING)

except (IndexError, KeyError) as exc:
Expand Down

0 comments on commit ad9e9bf

Please sign in to comment.