From e18882a8c107a67a498fa86c5aa5e71f663a273a Mon Sep 17 00:00:00 2001 From: Jude188 Date: Fri, 17 Jan 2020 18:11:27 +0000 Subject: [PATCH 1/2] Modernise discovery to use helpers --- tap_bing_ads/__init__.py | 113 ++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 60 deletions(-) diff --git a/tap_bing_ads/__init__.py b/tap_bing_ads/__init__.py index 39b522d..5c4afc2 100644 --- a/tap_bing_ads/__init__.py +++ b/tap_bing_ads/__init__.py @@ -166,10 +166,10 @@ def get_json_schema(element): if xml_type in ['dateTime', 'date']: _format = 'date-time' - schema = {'type': types} + schema = singer.Schema(type=types) if _format: - schema['format'] = _format + schema.format = _format return schema @@ -180,19 +180,12 @@ def get_array_type(array_type): # complex type items = xml_type # will be filled in fill_in_nested_types else: - items = { - 'type': json_type - } + items = json_type + items = singer.Schema(type=items) - array_obj = { - 'type': ['null', 'object'], - 'properties': {} - } + array_obj = singer.Schema(type=['null', 'object'], properties={}) - array_obj['properties'][xml_type] = { - 'type': ['null', 'array'], - 'items': items - } + array_obj.properties[xml_type] = singer.Schema(type=['null', 'array'], items=items) return array_obj @@ -233,21 +226,14 @@ def wsdl_type_to_schema(inherited_types, wsdl_type): else: properties[element.name] = get_json_schema(element) - return { - 'type': ['null', 'object'], - 'additionalProperties': False, - 'properties': properties - } + return singer.Schema(type=['null', 'object'], additionalProperties=False, properties=properties) def combine_object_schemas(schemas): properties = {} for schema in schemas: - for prop, prop_schema in schema['properties'].items(): + for prop, prop_schema in schema.properties.items(): properties[prop] = prop_schema - return { - 'type': ['object'], - 'properties': properties - } + return singer.Schema(type=['object'], properties=properties) def normalize_abstract_types(inherited_types, type_map): for base_type, types in inherited_types.items(): @@ -261,14 +247,15 @@ def normalize_abstract_types(inherited_types, type_map): if base_type in TOP_LEVEL_CORE_OBJECTS: type_map[base_type] = combine_object_schemas(schemas) else: - type_map[base_type] = {'anyOf': schemas} + type_map[base_type] = singer.Schema(anyOf=schemas) def fill_in_nested_types(type_map, schema): - if 'properties' in schema: - for prop, descriptor in schema['properties'].items(): - schema['properties'][prop] = fill_in_nested_types(type_map, descriptor) - elif 'items' in schema: - schema['items'] = fill_in_nested_types(type_map, schema['items']) + if hasattr(schema, 'properties') and schema.properties: + for prop, descriptor in schema.properties.items(): + schema.properties[prop] = fill_in_nested_types(type_map, descriptor) + elif hasattr(schema, 'items') and schema.items: + items = fill_in_nested_types(type_map, schema.items) + schema = singer.Schema(type=schema.type, items=items) else: if isinstance(schema, str) and schema in type_map: return type_map[schema] @@ -292,33 +279,44 @@ def get_type_map(client): return type_map -def get_stream_def(stream_name, schema, stream_metadata=None, pks=None, replication_key=None): + +def get_stream_def(stream_name, schema, stream_metadata=None, key_properties=None, replication_key=None): + stream_def = { 'tap_stream_id': stream_name, 'stream': stream_name, - 'schema': schema + 'schema': schema, + 'key_properties': key_properties } excluded_inclusion_fields = [] - if pks: - stream_def['key_properties'] = pks - excluded_inclusion_fields = pks - + if key_properties: + excluded_inclusion_fields = [("properties", field) for field in key_properties] if replication_key: - stream_def['replication_key'] = replication_key - stream_def['replication_method'] = 'INCREMENTAL' - excluded_inclusion_fields += [replication_key] + # replication_method = 'INCREMENTAL' + excluded_inclusion_fields += ("properties", replication_key) + valid_replication_keys = [replication_key] else: - stream_def['replication_method'] = 'FULL_TABLE' + valid_replication_keys = None + # replication_method = 'FULL_TABLE' if stream_metadata: - stream_def['metadata'] = stream_metadata + mdata = stream_metadata else: - stream_def['metadata'] = list(map( - lambda field: {"metadata": {"inclusion": "available"}, "breadcrumb": ["properties", field]}, - (schema['properties'].keys() - excluded_inclusion_fields))) - - return stream_def + mdata = metadata.get_standard_metadata(schema=schema.to_dict(), + key_properties=key_properties, + valid_replication_keys=valid_replication_keys) + # Forcing a replication key does not adhere + # to Singer Best Practices + # replication_method=replication_method) + mdata = metadata.to_map(mdata) + for breadcrumb in list(mdata.keys()): + if breadcrumb in excluded_inclusion_fields: + del mdata[breadcrumb] + mdata = metadata.to_list(mdata) + stream_def['metadata'] = mdata + + return singer.catalog.CatalogEntry(**stream_def) def get_core_schema(client, obj): type_map = get_type_map(client) @@ -332,19 +330,19 @@ def discover_core_objects(): account_schema = get_core_schema(client, 'AdvertiserAccount') core_object_streams.append( - get_stream_def('accounts', account_schema, pks=['Id'], replication_key='LastModifiedTime')) + get_stream_def('accounts', account_schema, key_properties=['Id'], replication_key='LastModifiedTime')) LOGGER.info('Initializing CampaignManagementService client - Loading WSDL') client = CustomServiceClient('CampaignManagementService') campaign_schema = get_core_schema(client, 'Campaign') - core_object_streams.append(get_stream_def('campaigns', campaign_schema, pks=['Id'])) + core_object_streams.append(get_stream_def('campaigns', campaign_schema, key_properties=['Id'])) ad_group_schema = get_core_schema(client, 'AdGroup') - core_object_streams.append(get_stream_def('ad_groups', ad_group_schema, pks=['Id'])) + core_object_streams.append(get_stream_def('ad_groups', ad_group_schema, key_properties=['Id'])) ad_schema = get_core_schema(client, 'Ad') - core_object_streams.append(get_stream_def('ads', ad_schema, pks=['Id'])) + core_object_streams.append(get_stream_def('ads', ad_schema, key_properties=['Id'])) return core_object_streams @@ -371,18 +369,14 @@ def get_report_schema(client, report_name): else: col_schema = {'type': ['null', _type]} - properties[column] = col_schema + properties[column] = singer.Schema.from_dict(col_schema) - properties['_sdc_report_datetime'] = { + properties['_sdc_report_datetime'] = singer.Schema.from_dict({ 'type': 'string', 'format': 'date-time' - } + }) - return { - 'properties': properties, - 'additionalProperties': False, - 'type': 'object' - } + return singer.schema.Schema(type='object', properties=properties, additionalProperties=None) def metadata_fn(report_name, field, required_fields): if field in required_fields: @@ -408,7 +402,7 @@ def get_report_metadata(report_name, report_schema): return list(map( lambda field: metadata_fn(report_name, field, required_fields), - report_schema['properties'])) + report_schema.properties)) def discover_reports(): report_streams = [] @@ -448,8 +442,7 @@ def do_discover(account_ids): LOGGER.info('Discovering reports') report_streams = discover_reports() - json.dump({'streams': core_object_streams + report_streams}, sys.stdout, indent=2) - + singer.catalog.write_catalog(singer.Catalog(core_object_streams + report_streams)) def check_for_invalid_selections(prop, mdata, invalid_selections): field_exclusions = metadata.get(mdata, ('properties', prop), 'fieldExclusions') From bf336bf60612c6f15843a9c6c43414a335818347 Mon Sep 17 00:00:00 2001 From: Jude188 Date: Fri, 17 Jan 2020 18:16:31 +0000 Subject: [PATCH 2/2] Allow for setting custom primary keys --- tap_bing_ads/__init__.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tap_bing_ads/__init__.py b/tap_bing_ads/__init__.py index 5c4afc2..7492e77 100644 --- a/tap_bing_ads/__init__.py +++ b/tap_bing_ads/__init__.py @@ -57,6 +57,10 @@ def get_user_agent(): return CONFIG.get('user_agent', DEFAULT_USER_AGENT) +def get_table_key_properties(catalog_item): + mdata = metadata.to_map(catalog_item.metadata) + return metadata.get(mdata, (), 'table-key-properties') + class InvalidDateRangeEnd(Exception): pass @@ -497,7 +501,8 @@ def sync_accounts_stream(account_ids, catalog_item): LOGGER.info('Initializing CustomerManagementService client - Loading WSDL') client = CustomServiceClient('CustomerManagementService') account_schema = get_core_schema(client, 'AdvertiserAccount') - singer.write_schema('accounts', account_schema, ['Id']) + pks = get_table_key_properties(catalog_item) or catalog_item.key_properties + singer.write_schema('accounts', account_schema, pks) for account_id in account_ids: client = create_sdk_client('CustomerManagementService', account_id) @@ -527,7 +532,9 @@ def sync_campaigns(client, account_id, selected_streams): if 'campaigns' in selected_streams: selected_fields = get_selected_fields(selected_streams['campaigns']) - singer.write_schema('campaigns', get_core_schema(client, 'Campaign'), ['Id']) + schema = get_core_schema(client, 'Campaign') + pks = get_table_key_properties(selected_streams['campaigns']) or selected_streams['campaigns'].key_properties + singer.write_schema('campaigns', schema, pks) with metrics.record_counter('campaigns') as counter: singer.write_records('campaigns', filter_selected_fields_many(selected_fields, campaigns)) @@ -548,7 +555,9 @@ def sync_ad_groups(client, account_id, campaign_ids, selected_streams): LOGGER.info('Syncing AdGroups for Account: {}, Campaign: {}'.format( account_id, campaign_id)) selected_fields = get_selected_fields(selected_streams['ad_groups']) - singer.write_schema('ad_groups', get_core_schema(client, 'AdGroup'), ['Id']) + schema = get_core_schema(client, 'AdGroup') + pks = get_table_key_properties(selected_streams['ad_groups']) or selected_streams['ad_groups'].key_properties + singer.write_schema('ad_groups', schema, pks) with metrics.record_counter('ad_groups') as counter: singer.write_records('ad_groups', filter_selected_fields_many(selected_fields, ad_groups)) @@ -575,7 +584,9 @@ def sync_ads(client, selected_streams, ad_group_ids): if 'Ad' in response_dict: selected_fields = get_selected_fields(selected_streams['ads']) - singer.write_schema('ads', get_core_schema(client, 'Ad'), ['Id']) + schema = get_core_schema(client, 'Ad') + pks = get_table_key_properties(selected_streams['ads']) or selected_streams['ads'].key_properties + singer.write_schema('ads', schema, pks) with metrics.record_counter('ads') as counter: ads = response_dict['Ad'] singer.write_records('ads', filter_selected_fields_many(selected_fields, ads)) @@ -742,7 +753,8 @@ async def sync_report_interval(client, account_id, report_stream, report_name = stringcase.pascalcase(report_stream.stream) report_schema = get_report_schema(client, report_name) - singer.write_schema(report_stream.stream, report_schema, []) + pks = get_table_key_properties(report_stream) or report_stream.key_properties + singer.write_schema(report_stream.stream, report_schema, pks=pks) report_time = arrow.get().isoformat() @@ -756,7 +768,6 @@ async def sync_report_interval(client, account_id, report_stream, try: success, download_url = await poll_report(client, account_id, report_name, start_date, end_date, request_id) - except Exception as some_error: LOGGER.info('The request_id %s for %s is invalid, generating a new one', request_id,