Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OARec: add distributed search functionality #919

Merged
merged 4 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions docs/distributedsearching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ Distributed Searching

.. note::

Distributed search is supported for CSW 2/3 APIs. OARec support will be implemented
following guidance from the OARec specification once available.
- in CSW mode, distributed search must be configured against remote CSW services
- in OGC API - Records mode, distributed search must be configured against remote OGC API - Records services

.. note::

Your server must be able to make outgoing HTTP requests for this functionality.

CSW 2 / 3
---------

pycsw has the ability to perform distributed searching against other CSW servers. Distributed searching is disabled by default; to enable, ``server.federatedcatalogues`` must be set. A CSW client must issue a GetRecords request with ``csw:DistributedSearch`` specified, along with an optional ``hopCount`` attribute (see subclause 10.8.4.13 of the CSW specification). When enabled, pycsw will search all specified catalogues and return a unified set of search results to the client. Due to the distributed nature of this functionality, requests will take extra time to process compared to queries against the local repository.

Scenario: Federated Search
--------------------------
^^^^^^^^^^^^^^^^^^^^^^^^^^

pycsw deployment with 3 configurations (CSW-1, CSW-2, CSW-3), subsequently providing three (3) endpoints. Each endpoint is based on an opaque metadata repository (based on theme/place/discipline, etc.). Goal is to perform a single search against all endpoints.

Expand Down Expand Up @@ -80,3 +83,22 @@ As a result, a pycsw deployment in this scenario may be approached on a per 'the
All interaction in this scenario is local to the pycsw installation, so network performance would not be problematic.

A very important facet of distributed search is as per Annex B of OGC:CSW 2.0.2. Given that all the CSW endpoints are managed locally, duplicates and infinite looping are not deemed to present an issue.

OGC API - Records
-----------------

Experimental support for distibuted searching is available in pycsw's OGC API - Records support to allow for searching remote services. The implementation uses the same approach as described above, operating in OGC API - Records mode.

.. note::

The ``federatedcatalogues`` directives must point to an OGC API - Records **collections** endpoint.

.. code-block:: none

[server]
...
federatedcatalogues=https://example.org/collections/collection1,https://example.org/collections/collection2

With the above configured, a distributed search can be invoked as follows:

http://localhost/collections/metadata:main/items?distributed=true
2 changes: 2 additions & 0 deletions pycsw/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ def str2bool(value: typing.Union[bool, str]) -> bool:
"""
helper function to return Python boolean
type (source: https://stackoverflow.com/a/715468)

:param value: value to be evaluated

:returns: `bool` of whether the value is boolean-ish
"""

Expand Down
16 changes: 15 additions & 1 deletion pycsw/ogc/api/oapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'):
'style': 'form',
'explode': False
}
oapi['components']['parameters']['distributed'] = {
'name': 'distributed',
'in': 'query',
'description': 'Whether to invoke distributed mode',
'schema': {
'type': 'boolean',
'default': False
},
'style': 'form',
'explode': False
}

# TODO: remove local definition of ids once implemented
# in OGC API - Records
oapi['components']['parameters']['ids'] = {
Expand Down Expand Up @@ -396,8 +408,9 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'):
{'$ref': '#/components/parameters/filter-lang'},
{'$ref': '#/components/parameters/f'},
{'$ref': '#/components/parameters/offset'},
{'$ref': '#/components/parameters/vendorSpecificParameters'},
{'$ref': '#/components/parameters/facets'},
{'$ref': '#/components/parameters/distributed'},
{'$ref': '#/components/parameters/vendorSpecificParameters'}
],
'responses': {
'200': {
Expand Down Expand Up @@ -467,6 +480,7 @@ def gen_oapi(config, oapi_filepath, mode='ogcapi-records'):
'parameters': [
{'$ref': '#/components/parameters/collectionId'},
{'$ref': '#/components/parameters/recordId'},
{'$ref': '#/components/parameters/distributed'},
f
],
'responses': {
Expand Down
52 changes: 49 additions & 3 deletions pycsw/ogc/api/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import os
from urllib.parse import urlencode, quote

from owslib.ogcapi.records import Records
from pygeofilter.parsers.ecql import parse as parse_ecql
from pygeofilter.parsers.cql2_json import parse as parse_cql2_json

Expand Down Expand Up @@ -511,6 +512,7 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'):
headers_['Content-Type'] = self.get_content_type(headers_, args)

reserved_query_params = [
'distributed',
'f',
'facets',
'filter',
Expand Down Expand Up @@ -725,6 +727,22 @@ def items(self, headers_, json_post_data, args, collection='metadata:main'):
for record in records:
response['features'].append(record2json(record, self.config['server']['url'], collection, self.mode))

response['distributedFeatures'] = []

distributed = str2bool(args.get('distributed', False))

if distributed and 'federatedcatalogues' in self.config['server']:
for fc in self.config['server']['federatedcatalogues'].split(','):
LOGGER.debug(f'Running distributed search against {fc}')
fc_url, _, fc_collection = fc.rsplit('/', 2)
try:
w = Records(fc_url)
fc_results = w.collection_items(fc_collection, **args)
for feature in fc_results['features']:
response['distributedFeatures'].append(feature)
except Exception as err:
LOGGER.warning(err)

LOGGER.debug('Creating links')

link_args = {**args}
Expand Down Expand Up @@ -811,6 +829,7 @@ def item(self, headers_, args, collection, item):
:returns: tuple of headers, status code, content
"""

record = None
headers_['Content-Type'] = self.get_content_type(headers_, args)

if collection not in self.get_all_collections():
Expand All @@ -821,15 +840,30 @@ def item(self, headers_, args, collection, item):
LOGGER.debug(f'Querying repository for item {item}')
try:
record = self.repository.query_ids([item])[0]
response = record2json(record, self.config['server']['url'],
collection, self.mode)
except IndexError:
distributed = str2bool(args.get('distributed', False))

if distributed and 'federatedcatalogues' in self.config['server']:
for fc in self.config['server']['federatedcatalogues'].split(','):
LOGGER.debug(f'Running distributed item search against {fc}')
fc_url, _, fc_collection = fc.rsplit('/', 2)
try:
w = Records(fc_url)
response = record = w.collection_item(fc_collection, item)
LOGGER.debug(f'Found item from {fc}')
break
except RuntimeError:
continue

if record is None:
return self.get_exception(
404, headers_, 'InvalidParameterValue', 'item not found')

if headers_['Content-Type'] == 'application/xml':
return headers_, 200, record.xml

response = record2json(record, self.config['server']['url'], collection, self.mode)

if headers_['Content-Type'] == 'text/html':
response['title'] = self.config['metadata:main']['identification_title']
response['collection'] = collection
Expand Down Expand Up @@ -959,7 +993,7 @@ def get_collection_info(self, collection_name: str = 'metadata:main',
title = collection_info.get('title')
description = collection_info.get('description')

return {
collection_info = {
'id': id_,
'title': title,
'description': description,
Expand All @@ -986,6 +1020,18 @@ def get_collection_info(self, collection_name: str = 'metadata:main',
}]
}

if collection_name == 'metadata:main':
if self.config['server'].get('federatedcatalogues') is not None:
LOGGER.debug('Adding federated catalogues')
collection_info['federatedCatalogues'] = []
for fc in self.config['server']['federatedcatalogues'].split(','):
collection_info['federatedCatalogues'].append({
'type': 'OGC API - Records',
'url': fc
})

return collection_info

def get_all_collections(self) -> list:
"""
Get all collections
Expand Down
Loading