Skip to content

Commit

Permalink
OARec: add distributed search functionality (#919)
Browse files Browse the repository at this point in the history
* OARec: add distributed search functionality
  • Loading branch information
tomkralidis authored Nov 29, 2023
1 parent 22773cc commit 964911b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 7 deletions.
28 changes: 25 additions & 3 deletions docs/distributedsearching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ Distributed Searching

.. note::

Distributed search is supported for CSW 2/3 APIs. OARec support will be implemented
following guidance from the OARec specification once available.
- in CSW mode, distributed search must be configured against remote CSW services
- in OGC API - Records mode, distributed search must be configured against remote OGC API - Records services

.. note::

Your server must be able to make outgoing HTTP requests for this functionality.

CSW 2 / 3
---------

pycsw has the ability to perform distributed searching against other CSW servers. Distributed searching is disabled by default; to enable, ``server.federatedcatalogues`` must be set. A CSW client must issue a GetRecords request with ``csw:DistributedSearch`` specified, along with an optional ``hopCount`` attribute (see subclause 10.8.4.13 of the CSW specification). When enabled, pycsw will search all specified catalogues and return a unified set of search results to the client. Due to the distributed nature of this functionality, requests will take extra time to process compared to queries against the local repository.

Scenario: Federated Search
--------------------------
^^^^^^^^^^^^^^^^^^^^^^^^^^

pycsw deployment with 3 configurations (CSW-1, CSW-2, CSW-3), subsequently providing three (3) endpoints. Each endpoint is based on an opaque metadata repository (based on theme/place/discipline, etc.). Goal is to perform a single search against all endpoints.

Expand Down Expand Up @@ -80,3 +83,22 @@ As a result, a pycsw deployment in this scenario may be approached on a per 'the
All interaction in this scenario is local to the pycsw installation, so network performance would not be problematic.

A very important facet of distributed search is as per Annex B of OGC:CSW 2.0.2. Given that all the CSW endpoints are managed locally, duplicates and infinite looping are not deemed to present an issue.

OGC API - Records
-----------------

Experimental support for distibuted searching is available in pycsw's OGC API - Records support to allow for searching remote services. The implementation uses the same approach as described above, operating in OGC API - Records mode.

.. note::

The ``federatedcatalogues`` directives must point to an OGC API - Records **collections** endpoint.

.. code-block:: none
[server]
...
federatedcatalogues=https://example.org/collections/collection1,https://example.org/collections/collection2
With the above configured, a distributed search can be invoked as follows:

http://localhost/collections/metadata:main/items?distributed=true
2 changes: 2 additions & 0 deletions pycsw/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ def str2bool(value: typing.Union[bool, str]) -> bool:
"""
helper function to return Python boolean
type (source: https://stackoverflow.com/a/715468)
:param value: value to be evaluated
:returns: `bool` of whether the value is boolean-ish
"""

Expand Down
16 changes: 15 additions & 1 deletion pycsw/ogc/api/oapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'):
'style': 'form',
'explode': False
}
oapi['components']['parameters']['distributed'] = {
'name': 'distributed',
'in': 'query',
'description': 'Whether to invoke distributed mode',
'schema': {
'type': 'boolean',
'default': False
},
'style': 'form',
'explode': False
}

# TODO: remove local definition of ids once implemented
# in OGC API - Records
oapi['components']['parameters']['ids'] = {
Expand Down Expand Up @@ -396,8 +408,9 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'):
{'$ref': '#/components/parameters/filter-lang'},
{'$ref': '#/components/parameters/f'},
{'$ref': '#/components/parameters/offset'},
{'$ref': '#/components/parameters/vendorSpecificParameters'},
{'$ref': '#/components/parameters/facets'},
{'$ref': '#/components/parameters/distributed'},
{'$ref': '#/components/parameters/vendorSpecificParameters'}
],
'responses': {
'200': {
Expand Down Expand Up @@ -467,6 +480,7 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'):
'parameters': [
{'$ref': '#/components/parameters/collectionId'},
{'$ref': '#/components/parameters/recordId'},
{'$ref': '#/components/parameters/distributed'},
f
],
'responses': {
Expand Down
52 changes: 49 additions & 3 deletions pycsw/ogc/api/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import os
from urllib.parse import urlencode, quote

from owslib.ogcapi.records import Records
from pygeofilter.parsers.ecql import parse as parse_ecql
from pygeofilter.parsers.cql2_json import parse as parse_cql2_json

Expand Down Expand Up @@ -511,6 +512,7 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'):
headers_['Content-Type'] = self.get_content_type(headers_, args)

reserved_query_params = [
'distributed',
'f',
'facets',
'filter',
Expand Down Expand Up @@ -725,6 +727,22 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'):
for record in records:
response['features'].append(record2json(record, self.config['server']['url'], collection, self.mode))

response['distributedFeatures'] = []

distributed = str2bool(args.get('distributed', False))

if distributed and 'federatedcatalogues' in self.config['server']:
for fc in self.config['server']['federatedcatalogues'].split(','):
LOGGER.debug(f'Running distributed search against {fc}')
fc_url, _, fc_collection = fc.rsplit('/', 2)
try:
w = Records(fc_url)
fc_results = w.collection_items(fc_collection, **args)
for feature in fc_results['features']:
response['distributedFeatures'].append(feature)
except Exception as err:
LOGGER.warning(err)

LOGGER.debug('Creating links')

link_args = {**args}
Expand Down Expand Up @@ -811,6 +829,7 @@ def item(self, headers_, args, collection, item):
:returns: tuple of headers, status code, content
"""

record = None
headers_['Content-Type'] = self.get_content_type(headers_, args)

if collection not in self.get_all_collections():
Expand All @@ -821,15 +840,30 @@ def item(self, headers_, args, collection, item):
LOGGER.debug(f'Querying repository for item {item}')
try:
record = self.repository.query_ids([item])[0]
response = record2json(record, self.config['server']['url'],
collection, self.mode)
except IndexError:
distributed = str2bool(args.get('distributed', False))

if distributed and 'federatedcatalogues' in self.config['server']:
for fc in self.config['server']['federatedcatalogues'].split(','):
LOGGER.debug(f'Running distributed item search against {fc}')
fc_url, _, fc_collection = fc.rsplit('/', 2)
try:
w = Records(fc_url)
response = record = w.collection_item(fc_collection, item)
LOGGER.debug(f'Found item from {fc}')
break
except RuntimeError:
continue

if record is None:
return self.get_exception(
404, headers_, 'InvalidParameterValue', 'item not found')

if headers_['Content-Type'] == 'application/xml':
return headers_, 200, record.xml

response = record2json(record, self.config['server']['url'], collection, self.mode)

if headers_['Content-Type'] == 'text/html':
response['title'] = self.config['metadata:main']['identification_title']
response['collection'] = collection
Expand Down Expand Up @@ -959,7 +993,7 @@ def get_collection_info(self, collection_name: str = 'metadata:main',
title = collection_info.get('title')
description = collection_info.get('description')

return {
collection_info = {
'id': id_,
'title': title,
'description': description,
Expand All @@ -986,6 +1020,18 @@ def get_collection_info(self, collection_name: str = 'metadata:main',
}]
}

if collection_name == 'metadata:main':
if self.config['server'].get('federatedcatalogues') is not None:
LOGGER.debug('Adding federated catalogues')
collection_info['federatedCatalogues'] = []
for fc in self.config['server']['federatedcatalogues'].split(','):
collection_info['federatedCatalogues'].append({
'type': 'OGC API - Records',
'url': fc
})

return collection_info

def get_all_collections(self) -> list:
"""
Get all collections
Expand Down

0 comments on commit 964911b

Please sign in to comment.