Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion docker/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,42 @@ def launch_pycsw(pycsw_config, workers=2, reload=False):
except Exception as err:
LOGGER.debug(err)

repo = Repository(database, StaticContext(), table=table)
if 'source' in pycsw_config['repository']: # load custom repository
rs = pycsw_config['repository']['source']
rs_modname, rs_clsname = rs.rsplit('.', 1)

rs_mod = __import__(rs_modname, globals(), locals(), [rs_clsname])
rs_cls = getattr(rs_mod, rs_clsname)

try:
connection_done = False
max_attempts = 0
max_retries = pycsw_config['repository'].get('maxretries', 5)
while not connection_done and max_attempts <= max_retries:
try:
repo = rs_cls(pycsw_config['repository'], StaticContext())
LOGGER.debug('Custom repository %s loaded' % pycsw_config['repository']['source'])
connection_done = True
except Exception as err:
LOGGER.debug(f'Repository not loaded retry connection {max_attempts}: {err}')
max_attempts += 1
except Exception as err:
msg = 'Could not load custom repository %s: %s' % (rs, err)
LOGGER.exception(msg)
error = 1
code = 'NoApplicableCode'
locator = 'service'
text = 'Could not initialize repository. Check server logs'

else:
try:
LOGGER.info('Loading default repository')
repo = repository.Repository(pycsw_config, StaticContext())
LOGGER.debug(f'Repository loaded {repo.dbtype}')
except Exception as err:
msg = f'Could not load repository {err}'
LOGGER.exception(msg)
raise

repo.ping()

Expand Down
4 changes: 2 additions & 2 deletions docs/repofilters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ The same CSW `GetRecords` filter as per above then yields the following results:

Another example:

.. code-block:: text
.. code-block:: yaml

repository:0
repository:
database: sqlite:///records.db
filter: "pycsw:ParentIdentifier != '33'"

Expand Down
41 changes: 33 additions & 8 deletions pycsw/core/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
def load_records(context, database, table, xml_dirpath, recursive=False, force_update=False):
"""Load metadata records from directory of files to database"""

repo = repository.Repository(database, context, table=table)
repo_config = {
'database': database,
'table': table
}
repo = repository.Repository(repo_config, context)

file_list = []

Expand Down Expand Up @@ -121,7 +125,13 @@ def load_records(context, database, table, xml_dirpath, recursive=False, force_u

def export_records(context, database, table, xml_dirpath):
"""Export metadata records from database to directory of files"""
repo = repository.Repository(database, context, table=table)

repo_config = {
'database': database,
'table': table
}

repo = repository.Repository(repo_config, context)

LOGGER.info('Querying database %s, table %s ....', database, table)
records = repo.session.query(repo.dataset)
Expand Down Expand Up @@ -190,8 +200,13 @@ def refresh_harvested_records(context, database, table, url):
"""refresh / harvest all non-local records in repository"""
from owslib.csw import CatalogueServiceWeb

repo_config = {
'database': database,
'table': table
}

# get configuration and init repo connection
repos = repository.Repository(database, context, table=table)
repos = repository.Repository(repo_config, context)

# get all harvested records
count, records = repos.query(constraint={'where': "mdsource != 'local'", 'values': []})
Expand Down Expand Up @@ -228,8 +243,13 @@ def refresh_harvested_records(context, database, table, url):
def gen_sitemap(context, database, table, url, output_file):
"""generate an XML sitemap from all records in repository"""

repo_config = {
'database': database,
'table': table
}

# get configuration and init repo connection
repos = repository.Repository(database, context, table=table)
repos = repository.Repository(repo_config, context)

# write out sitemap document
urlset = etree.Element(util.nspath_eval('sitemap:urlset',
Expand Down Expand Up @@ -274,7 +294,7 @@ def post_xml(url, xml, timeout=30):
from owslib.util import http_post
try:
with open(xml) as f:
return http_post(url=url, request=f.read(), timeout=timeout)
return http_post(url=url, request=f.read(), timeout=timeout).text
except Exception as err:
LOGGER.exception('HTTP XML POST error')
raise RuntimeError(err) from err
Expand Down Expand Up @@ -351,7 +371,12 @@ def delete_records(context, database, table):

LOGGER.info('Deleting all records')

repo = repository.Repository(database, context, table=table)
repo_config = {
'database', database,
'table', table
}

repo = repository.Repository(repo_config, context)
repo.delete(constraint={'where': '', 'values': []})


Expand Down Expand Up @@ -493,7 +518,7 @@ def cli_rebuild_db_indexes(ctx, config, verbosity):

context = pconfig.StaticContext()

repo = repository.Repository(cfg['repository']['database'], context, table=cfg['repository'].get('table'))
repo = repository.Repository(cfg['repository'], context)
repo.rebuild_db_indexes()


Expand All @@ -509,7 +534,7 @@ def cli_optimize_db(ctx, config, verbosity):

context = pconfig.StaticContext()

repo = repository.Repository(cfg['repository']['database'], context, table=cfg['repository'].get('table'))
repo = repository.Repository(cfg['repository'], context)
repo.optimize_db()


Expand Down
2 changes: 1 addition & 1 deletion pycsw/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
LOGGER = logging.getLogger(__name__)


class StaticContext(object):
class StaticContext:
"""core configuration"""
def __init__(self, prefix='csw30'):
"""initializer"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# Authors: Tom Kralidis <[email protected]>
#
# Copyright (c) 2021 Tom Kralidis
# Copyright (c) 2025 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
Expand Down Expand Up @@ -35,14 +35,18 @@
from pygeofilter.backends.evaluator import handle
from pygeofilter.backends.sqlalchemy import filters
from pygeofilter.backends.sqlalchemy.evaluate import SQLAlchemyFilterEvaluator
from pygeofilter.parsers.fes.util import Element
from pygeofilter.parsers.fes.util import handle as fhandle
from pygeofilter.parsers.fes.v11 import FES11Parser

from pycsw.core.util import bbox2wktpolygon

LOGGER = logging.getLogger(__name__)


class PycswFilterEvaluator(SQLAlchemyFilterEvaluator):
def __init__(self, field_mapping=None, dbtype='sqlite', undefined_as_null=None):
def __init__(self, field_mapping=None, dbtype='sqlite',
undefined_as_null=None):
super().__init__(field_mapping, undefined_as_null=undefined_as_null)
self._pycsw_dbtype = dbtype

Expand Down Expand Up @@ -81,3 +85,17 @@ def ilike(self, node, lhs):

def to_filter(ast, dbtype, field_mapping=None):
return PycswFilterEvaluator(field_mapping, dbtype).evaluate(ast)


class PycswCSWFES11Parser(FES11Parser):
def parse(self, input_):
return FES11Parser().parse(input_)

@fhandle('BBOX')
def geometry_bbox(self, node: Element, lhs, rhs, crs=None):
minx = rhs.geometry['coordinates'][0][0][1]
miny = rhs.geometry['coordinates'][0][0][0]
maxx = rhs.geometry['coordinates'][0][2][1]
maxy = rhs.geometry['coordinates'][0][2][0]

return ast.BBox(lhs, minx, miny, maxx, maxy, crs)
104 changes: 86 additions & 18 deletions pycsw/core/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Angelos Tzotsos <[email protected]>
# Ricardo Garcia Silva <[email protected]>
#
# Copyright (c) 2024 Tom Kralidis
# Copyright (c) 2025 Tom Kralidis
# Copyright (c) 2015 Angelos Tzotsos
# Copyright (c) 2017 Ricardo Garcia Silva
#
Expand Down Expand Up @@ -34,6 +34,7 @@

import inspect
import logging
from operator import itemgetter
import os
from time import sleep

Expand All @@ -49,11 +50,12 @@
from pycsw.core import util
from pycsw.core.etree import etree
from pycsw.core.etree import PARSER
from pycsw.core.pygeofilter_ext import to_filter

LOGGER = logging.getLogger(__name__)


class Repository(object):
class Repository:
_engines = {}

@classmethod
Expand Down Expand Up @@ -87,14 +89,15 @@ def connect(dbapi_connection, connection_rec):
return clazz._engines[url]

''' Class to interact with underlying repository '''
def __init__(self, database, context, app_root=None, table='records', repo_filter=None):
def __init__(self, repo_object, context, app_root=None):
''' Initialize repository '''

self.context = context
self.filter = repo_filter
self.filter = repo_object.get('filter')
self.fts = False
self.database = database
self.table = table
self.database = repo_object.get('database')
self.table = repo_object.get('table')
self.facets = repo_object.get('facets', [])

# Don't use relative paths, this is hack to get around
# most wsgi restriction...
Expand All @@ -110,7 +113,7 @@ def __init__(self, database, context, app_root=None, table='records', repo_filte

self.postgis_geometry_column = None

schema_name, table_name = table.rpartition(".")[::2]
schema_name, table_name = self.table.rpartition(".")[::2]

default_table_args = {
"autoload": True,
Expand Down Expand Up @@ -145,6 +148,7 @@ def __init__(self, database, context, app_root=None, table='records', repo_filte
temp_dbtype = None

self.query_mappings = {
# OGC API - Records mappings
'identifier': self.dataset.identifier,
'type': self.dataset.type,
'typename': self.dataset.typename,
Expand All @@ -167,6 +171,10 @@ def __init__(self, database, context, app_root=None, table='records', repo_filte
'off_nadir': self.dataset.illuminationelevationangle
}

LOGGER.debug('adding OGC CSW mappings')
for key, value in self.context.models['csw']['typenames']['csw:Record']['queryables']['SupportedDublinCoreQueryables'].items():
self.query_mappings[key] = util.getqattr(self.dataset, value['dbcol'])

if self.dbtype == 'postgresql':
# check if PostgreSQL is enabled with PostGIS 1.x
try:
Expand Down Expand Up @@ -410,18 +418,34 @@ def query_source(self, source):
query = self.session.query(self.dataset).filter(column == source)
return self._get_repo_filter(query).all()

def query(self, constraint, sortby=None, typenames=None,
def query(self, constraint=None, sortby=None, typenames=None,
maxrecords=10, startposition=0):
''' Query records from underlying repository '''

# run the raw query and get total
if 'where' in constraint: # GetRecords with constraint
LOGGER.debug('constraint detected')
query = self.session.query(self.dataset).filter(
text(constraint['where'])).params(self._create_values(constraint['values']))
else: # GetRecords sans constraint
LOGGER.debug('No constraint detected')
query = self.session.query(self.dataset)
if constraint.get('ast') is not None: # GetRecords with pygeofilter AST
LOGGER.debug('pygeofilter AST detected')
LOGGER.debug('Transforming AST into filters')
try:
filters = to_filter(constraint['ast'], self.dbtype, self.query_mappings)
LOGGER.debug(f'Filter: {filters}')
except Exception as err:
msg = f'AST evaluator error: {str(err)}'
LOGGER.exception(msg)
raise RuntimeError(msg)

query = self.session.query(self.dataset).filter(filters)

else: # GetRecords sans pygeofilter AST
LOGGER.debug('No pygeofilter AST detected')

# run the raw query and get total
if 'where' in constraint: # GetRecords with constraint
LOGGER.debug('constraint detected')
query = self.session.query(self.dataset).filter(
text(constraint['where'])).params(self._create_values(constraint['values']))
else: # GetRecords sans constraint
LOGGER.debug('No constraint detected')
query = self.session.query(self.dataset)

total = self._get_repo_filter(query).count()

Expand All @@ -438,7 +462,10 @@ def query(self, constraint, sortby=None, typenames=None,
if sortby is not None: # apply sorting
LOGGER.debug('sorting detected')
# TODO: Check here for dbtype so to extract wkt from postgis native to wkt
sortby_column = getattr(self.dataset, sortby['propertyname'])
try:
sortby_column = getattr(self.dataset, sortby['propertyname'])
except:
sortby_column = self.query_mappings.get(sortby['propertyname'])

if sortby['order'] == 'DESC': # descending sort
if 'spatial' in sortby and sortby['spatial']: # spatial sort
Expand All @@ -452,9 +479,50 @@ def query(self, constraint, sortby=None, typenames=None,
query = query.order_by(sortby_column)

# always apply limit and offset
return [str(total), self._get_repo_filter(query).limit(
return [total, self._get_repo_filter(query).limit(
maxrecords).offset(startposition).all()]

def get_facets(self, ast=None) -> dict:
"""
Gets all facets for a given query

:returns: `dict` of facets
"""

facets_results = {}

for facet in self.facets:
LOGGER.debug(f'Running facet for {facet}')
facetq = self.session.query(self.query_mappings[facet], self.func.count(facet)).group_by(facet)

if ast is not None:
try:
filters = to_filter(ast, self.dbtype, self.query_mappings)
LOGGER.debug(f'Filter: {filters}')
except Exception as err:
msg = f'AST evaluator error: {str(err)}'
LOGGER.exception(msg)
raise RuntimeError(msg)

facetq = facetq.filter(filters)

LOGGER.debug('Writing facet query results')
facets_results[facet] = {
'type': 'terms',
'property': facet,
'buckets': []
}

for fq in facetq.all():
facets_results[facet]['buckets'].append({
'value': fq[0],
'count': fq[1]
})

facets_results[facet]['buckets'].sort(key=itemgetter('count'), reverse=True)

return facets_results

def insert(self, record, source, insert_date):
''' Insert a record into the repository '''

Expand Down
Loading