diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index aba454dd..d21b3ff3 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -10,7 +10,7 @@ from ckan.lib.helpers import json from ckan.plugins import toolkit -from ckanext.harvest.model import HarvestObject +from ckanext.harvest.model import HarvestObject, HarvestObjectExtra from .base import HarvesterBase import logging @@ -159,7 +159,7 @@ def validate_config(self, config): except NotFound: raise ValueError('User not found') - for key in ('read_only', 'force_all'): + for key in ('read_only', 'force_all', 'delete_missing'): if key in config_obj: if not isinstance(config_obj[key], bool): raise ValueError('%s must be boolean' % key) @@ -184,6 +184,18 @@ def gather_stage(self, harvest_job): self._set_config(harvest_job.source.config) + # If using delete_missing, get the previous package ids for this source + delete_missing = self.config.get('delete_missing', False) + if delete_missing: + query = model.Session.query(HarvestObject.package_id) \ + .filter(HarvestObject.package_id != None) \ + .filter(HarvestObject.current == True) \ + .filter(HarvestObject.harvest_source_id == harvest_job.source.id) + + package_ids_in_db = set() + for package_id, in query: + package_ids_in_db.add(package_id) + # Get source URL remote_ckan_base_url = harvest_job.source.url.rstrip('/') @@ -209,10 +221,12 @@ def gather_stage(self, harvest_job): # Ideally we can request from the remote CKAN only those datasets # modified since the last completely successful harvest. + # If using delete_missing option we have to get all datasets last_error_free_job = self.last_error_free_job(harvest_job) log.debug('Last error-free job: %r', last_error_free_job) if (last_error_free_job and - not self.config.get('force_all', False)): + not self.config.get('force_all', False) and + not delete_missing): get_all_packages = False # Request only the datasets modified since @@ -283,6 +297,25 @@ def gather_stage(self, harvest_job): obj.save() object_ids.append(obj.id) + # If using delete_missing option, check for datasets that no longer + # exist + if delete_missing: + ids_to_delete = package_ids_in_db - package_ids + for pkg_id in ids_to_delete: + log.debug('Creating HarvestObject to delete dataset %s', + pkg_id) + obj = HarvestObject( + guid=pkg_id, + package_id=pkg_id, + job=harvest_job, + content='', + extras=[ + HarvestObjectExtra(key='missing', value='true') + ] + ) + obj.save() + object_ids.append(obj.id) + return object_ids except Exception as e: self._save_gather_error('%r' % e.message, harvest_job) @@ -385,6 +418,26 @@ def import_stage(self, harvest_object): self._set_config(harvest_object.job.source.config) try: + # If using delete_missing option, check if this object tells to + # delete the dataset + delete_missing = self.config.get('delete_missing', False) + if (delete_missing and + 'missing' in [item.key for item in harvest_object.extras]): + # Delete package + toolkit.get_action('package_delete')( + base_context.copy(), {'id': harvest_object.package_id}) + log.info('Deleted package {0}'.format(harvest_object.package_id)) + + # Update previous harvest object + model.Session.query(HarvestObject).\ + filter(HarvestObject.guid == harvest_object.guid).\ + filter(HarvestObject.harvest_source_id == + harvest_object.source.id).\ + filter(HarvestObject.current == True).\ + update({'current': False}) + + return True + package_dict = json.loads(harvest_object.content) if package_dict.get('type') == 'harvest':