diff --git a/README.md b/README.md index ce55127..841b51b 100644 --- a/README.md +++ b/README.md @@ -215,84 +215,166 @@ possible to create the container for the output table using the `CommonInterface ![TableDefinition dependencies](docs/imgs/TableDefinition_class.png) -**Table schema example:** +**Table schema examples:** + +**Example 1: Creating table with predefined schema** ```python from keboola.component import CommonInterface +from collections import OrderedDict from keboola.component.dao import ColumnDefinition, DataType, SupportedDataTypes, BaseType # init the interface -ci = CommonInterface(data_folder_path='data') - -# create container for the result -out = ci.create_out_table_definition("testDef", - schema=['foo', 'bar'], - destination='some-destination', - primary_key=['foo'], - incremental=True, - delete_where={'column': 'lilly', - 'values': ['a', 'b'], - 'operator': 'eq'}) - -# update column -out.update_column('foo', - ColumnDefinition(data_types=BaseType(dtype=SupportedDataTypes.INTEGER, length='20'))) - -# add new columns -out.add_column('note', ColumnDefinition(nullable=False)) -out.add_column('test1') -out.add_columns(['test2', 'test3', 'test4']) - -# add new typed column -out.add_column('id', ColumnDefinition(primary_key=True, - data_types={'snowflake': DataType(dtype="INTEGER", length='200')}) - ) - -out.add_columns({ - 'phone': ColumnDefinition(primary_key=True, - data_types={'snowflake': DataType(dtype="INTEGER", length='200'), - 'bigquery': DataType(dtype="BIGINT")}), - 'new2': ColumnDefinition(data_types={'snowflake': DataType(dtype="INTEGER", length='200')}), - }) - -# delete columns -out.delete_column('bar') -out.delete_columns(['test2', 'test3']) - - -# write some content -with open(out.full_path, 'w') as result: - result.write('line') +ci = CommonInterface() + +# Define complete schema upfront +schema = OrderedDict({ + "id": ColumnDefinition( + data_types=BaseType.integer(), + primary_key=True + ), + "created_at": ColumnDefinition( + data_types=BaseType(dtype=SupportedDataTypes.TIMESTAMP) + ), + "status": ColumnDefinition(), + "value": ColumnDefinition( + data_types=BaseType.numeric(length="38,2") + ) +}) + +# Create table definition with predefined schema +out_table = ci.create_out_table_definition( + name="results.csv", # File name for the output + destination="out.c-data.results", # Destination table in Storage + schema=schema, # Predefined schema + incremental=True # Enable incremental loading +) + +# Write some data to the output file +import csv +with open(out_table.full_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=out_table.column_names) + writer.writeheader() + writer.writerow({ + "id": "1", + "created_at": "2023-01-15T14:30:00Z", + "status": "completed", + "value": "123.45" + }) -# write manifest -ci.write_manifest(out) +# Write manifest +ci.write_manifest(out_table) ``` -**Example:** +**Example 2: Creating table with empty schema and adding columns dynamically** ```python from keboola.component import CommonInterface -from keboola.component import dao +from keboola.component.dao import ColumnDefinition, DataType, SupportedDataTypes, BaseType +import csv # init the interface ci = CommonInterface() -# create container for the result -result_table = ci.create_out_table_definition('my_new_result_table', primary_key=['id'], incremental=True) +# Create table definition with empty schema +out_table = ci.create_out_table_definition( + name="dynamic_results.csv", + destination="out.c-data.dynamic_results", + incremental=True +) + +# Add columns using different data type methods +# Method 1: Using BaseType helper +out_table.add_column("id", + ColumnDefinition( + primary_key=True, + data_types=BaseType.integer() + ) +) + +# Method 2: Using SupportedDataTypes enum +out_table.add_column("created_at", + ColumnDefinition( + data_types=BaseType(dtype=SupportedDataTypes.TIMESTAMP) + ) +) + +# Method 3: Simple column without specific data type +out_table.add_column("status", ColumnDefinition()) + +# Method 4: Using BaseType with parameters +out_table.add_column("price", + ColumnDefinition( + data_types=BaseType.numeric(length="10,2"), + description="Product price with 2 decimal places" + ) +) + +# Method 5: Backend-specific data types +out_table.add_column("metadata", + ColumnDefinition( + data_types={ + "snowflake": DataType(dtype="VARIANT"), + "bigquery": DataType(dtype="JSON"), + "base": DataType(dtype=SupportedDataTypes.STRING, length="65535") + }, + description="JSON metadata column" + ) +) + +# Update existing column (example of column modification) +out_table.update_column("price", + ColumnDefinition( + data_types={ + "snowflake": DataType(dtype="NUMBER", length="15,4"), + "bigquery": DataType(dtype="NUMERIC", length="15,4"), + "base": DataType(dtype=SupportedDataTypes.NUMERIC, length="15,4") + }, + description="Updated price with 4 decimal places for higher precision" + ) +) + +# Write some data to the output file +with open(out_table.full_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=out_table.column_names) + writer.writeheader() + writer.writerow({ + "id": "1", + "created_at": "2023-01-15T14:30:00Z", + "status": "active", + "price": "99.9999", + "metadata": '{"category": "electronics", "brand": "TechCorp"}' + }) + +# Write manifest +ci.write_manifest(out_table) +``` + +**Simple Example for Basic Use Cases:** -# write some content -with open(result_table.full_path, 'w') as result: - result.write('line') +```python +from keboola.component import CommonInterface +import csv -# add some metadata -result_table.table_metadata.add_table_description('My new table description') -# add column datatype -result_table.table_metadata.add_column_data_type('id', dao.SupportedDataTypes.STRING, - source_data_type='VARCHAR(100)', - nullable=True, - length=100) +# Initialize the component +ci = CommonInterface() -# write manifest +# Create output table +result_table = ci.create_out_table_definition( + 'output.csv', + primary_key=['id'], + incremental=True, + description='Data processed by my component' +) + +# Write data to CSV +with open(result_table.full_path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=['id', 'name', 'value']) + writer.writeheader() + writer.writerow({"id": "1", "name": "Test", "value": "100"}) + writer.writerow({"id": "2", "name": "Example", "value": "200"}) + +# Write manifest file ci.write_manifest(result_table) ``` @@ -307,35 +389,133 @@ table_def = ci.get_input_table_definition_by_name('input.csv') ``` -### Initializing TableDefinition object from the manifest file +## Working with Input/Output Mapping + +Keboola Connection provides input/output mappings that define which tables are loaded into your component and which tables should be stored back. These mappings are defined in the configuration file and can be accessed programmatically. + +### Accessing Input Tables from Mapping ```python -from keboola.component import dao +from keboola.component import CommonInterface +import csv + +# Initialize the component +ci = CommonInterface() + +# Access input mapping configuration +input_tables = ci.configuration.tables_input_mapping + +# Process each input table +for table in input_tables: + # Get the destination (filename in the /data/in/tables directory) + table_name = table.destination + + # Load table definition from manifest + table_def = ci.get_input_table_definition_by_name(table_name) + + # Print information about the table + print(f"Processing table: {table_name}") + print(f" - Source: {table.source}") + print(f" - Full path: {table_def.full_path}") + print(f" - Columns: {table_def.column_names}") + + # Read data from the CSV file + with open(table_def.full_path, 'r') as input_file: + csv_reader = csv.DictReader(input_file) + for row in csv_reader: + # Process each row + print(f" - Row: {row}") +``` -table_def = dao.TableDefinition.build_from_manifest('data/in/tables/table.csv.manifest') +### Creating Output Tables based on Output Mapping -# print table.csv full-path if present: +```python +from keboola.component import CommonInterface +import csv -print(table_def.full_path) +# Initialize the component +ci = CommonInterface() -# rows count +# Access output mapping configuration +output_tables = ci.configuration.tables_output_mapping -print(table_def.rows_count) +# Process each output table mapping +for i, table_mapping in enumerate(output_tables): + # Get source (filename that should be created) and destination (where it will be stored in KBC) + source = table_mapping.source + destination = table_mapping.destination + + # Create output table definition + out_table = ci.create_out_table_definition( + name=source, + destination=destination, + incremental=table_mapping.incremental + ) + + # Add some sample data (in a real component, this would be your processed data) + with open(out_table.full_path, 'w', newline='') as out_file: + writer = csv.DictWriter(out_file, fieldnames=['id', 'data']) + writer.writeheader() + writer.writerow({'id': f'{i+1}', 'data': f'Data for {destination}'}) + + # Write manifest file + ci.write_manifest(out_table) ``` -### Retrieve raw manifest file definition (CommonInterface compatible) +### Combining Input and Output Mapping -To retrieve the manifest file representation that is compliant with Keboola Connection Common Interface use -the `table_def.get_manifest_dictionary()` method. +Here's a complete example that reads data from input tables and creates output tables: ```python -from keboola.component import dao +from keboola.component import CommonInterface +import csv -table_def = dao.TableDefinition.build_from_manifest('data/in/tables/table.csv.manifest') +# Initialize the component +ci = CommonInterface() -# get the manifest file representation -manifest_dict = table_def.get_manifest_dictionary() +# Get input tables +input_tables = ci.configuration.tables_input_mapping +output_tables = ci.configuration.tables_output_mapping +# Process each output table based on input +for i, out_mapping in enumerate(output_tables): + # Find corresponding input table if possible (matching by index for simplicity) + in_mapping = input_tables[i] if i < len(input_tables) else None + + # Create output table + out_table = ci.create_out_table_definition( + name=out_mapping.source, + destination=out_mapping.destination, + incremental=out_mapping.incremental + ) + + # If we have an input table, transform its data + if in_mapping: + in_table = ci.get_input_table_definition_by_name(in_mapping.destination) + + # Read input and write to output with transformation + with open(in_table.full_path, 'r') as in_file, open(out_table.full_path, 'w', newline='') as out_file: + reader = csv.DictReader(in_file) + + # Create writer with same field names + fieldnames = reader.fieldnames + writer = csv.DictWriter(out_file, fieldnames=fieldnames) + writer.writeheader() + + # Transform each row and write to output + for row in reader: + # Simple transformation example - uppercase all values + transformed_row = {k: v.upper() if isinstance(v, str) else v for k, v in row.items()} + writer.writerow(transformed_row) + else: + # No input table, create sample output + with open(out_table.full_path, 'w', newline='') as out_file: + writer = csv.DictWriter(out_file, fieldnames=['id', 'data']) + writer.writeheader() + writer.writerow({'id': f'{i+1}', 'data': f'Sample data for {out_mapping.destination}'}) + + # Write manifest + ci.write_manifest(out_table) ``` ## Processing input files @@ -352,63 +532,143 @@ files matching specific tag. By default, the method returns only the latest file from keboola.component import CommonInterface import logging -# init the interface +# Initialize the interface ci = CommonInterface() -input_files = ci.get_input_files_definitions(tags=['my_tag'], only_latest_files=True) - -# print path of the first file (random order) matching the criteria -first_file = input_files[0] -logging.info(f'The first file named: "{first_file.name}" is at path: {first_file.full_path}') - +# Get input files with specific tags (only latest versions) +input_files = ci.get_input_files_definitions(tags=['images', 'documents'], only_latest_files=True) +# Process each file +for file in input_files: + print(f"Processing file: {file.name}") + print(f" - Full path: {file.full_path}") + print(f" - Tags: {file.tags}") + + # Example: Process image files + if 'images' in file.tags: + # Process image using appropriate library + print(f" - Processing image: {file.name}") + # image = Image.open(file.full_path) + # ... process image ... + + # Example: Process document files + if 'documents' in file.tags: + print(f" - Processing document: {file.name}") + # ... process document ... ``` -When working with files it may be useful to retrieve them in a dictionary structure grouped either by name or a tag -group. For this there are convenience methods `get_input_file_definitions_grouped_by_tag_group()` -and `get_input_file_definitions_grouped_by_name()` +### Grouping Files by Tags + +When working with files it may be useful to retrieve them in a dictionary structure grouped by tag: ```python from keboola.component import CommonInterface -import logging -# init the interface +# Initialize the interface ci = CommonInterface() -# group by tag -input_files_by_tag = ci.get_input_file_definitions_grouped_by_tag_group(only_latest_files=True) +# Group files by tag +files_by_tag = ci.get_input_file_definitions_grouped_by_tag_group(only_latest_files=True) -# print list of files matching specific tag -logging.info(input_files_by_tag['my_tag']) +# Process files for each tag +for tag, files in files_by_tag.items(): + print(f"Processing tag group: {tag}") + for file in files: + print(f" - File: {file.name}") + # Process file based on its tag +``` + +### Creating Output Files + +Similar to tables, you can create output files with appropriate manifests: + +```python +from keboola.component import CommonInterface -# group by name -input_files_by_name = ci.get_input_file_definitions_grouped_by_name(only_latest_files=True) +# Initialize the interface +ci = CommonInterface() + +# Create output file definition +output_file = ci.create_out_file_definition( + name="results.json", + tags=["processed", "results"], + is_public=False, + is_permanent=True +) -# print list of files matching specific name -logging.info(input_files_by_name['image.jpg']) +# Write content to the file +with open(output_file.full_path, 'w') as f: + f.write('{"status": "success", "processed_records": 42}') +# Write manifest file +ci.write_manifest(output_file) ``` ## Processing state files -[State files](https://developers.keboola.com/extend/common-interface/config-file/#state-file) can be easily written and -loaded using the `get_state_file()` and `write_state_file()` methods: +[State files](https://developers.keboola.com/extend/common-interface/config-file/#state-file) allow your component to store and retrieve information between runs. This is especially useful for incremental processing or tracking the last processed data. ```python from keboola.component import CommonInterface from datetime import datetime -import logging +import json -# init the interface +# Initialize the interface ci = CommonInterface() -last_state = ci.get_state_file() +# Load state from previous run +state = ci.get_state_file() + +# Get the last processed timestamp (or use default if this is the first run) +last_updated = state.get("last_updated", "1970-01-01T00:00:00Z") +print(f"Last processed data up to: {last_updated}") + +# Process data (only data newer than last_updated) +# In a real component, this would involve your business logic +processed_items = [ + {"id": 1, "timestamp": "2023-05-15T10:30:00Z"}, + {"id": 2, "timestamp": "2023-05-16T14:45:00Z"} +] + +# Get the latest timestamp for the next run +if processed_items: + # Sort items by timestamp to find the latest one + processed_items.sort(key=lambda x: x["timestamp"]) + new_last_updated = processed_items[-1]["timestamp"] +else: + # No new items, keep the previous timestamp + new_last_updated = last_updated + +# Store the new state for the next run +ci.write_state_file({ + "last_updated": new_last_updated, + "processed_count": len(processed_items), + "last_run": datetime.now().isoformat() +}) + +print(f"State updated, next run will process data from: {new_last_updated}") +``` + +State files can contain any serializable JSON structure, so you can store complex information: -# print last_updated if exists -logging.info(f'Previous job stored following last_updated value: {last_state.get("last_updated", "")})') +```python +# More complex state example +state = { + "last_run": datetime.now().isoformat(), + "api_pagination": { + "next_page_token": "abc123xyz", + "page_size": 100, + "total_pages_retrieved": 5 + }, + "processed_ids": [1001, 1002, 1003, 1004], + "statistics": { + "success_count": 1000, + "error_count": 5, + "skipped_count": 10 + } +} -# store new state file -ci.write_state_file({"last_updated": datetime.now().isoformat()}) +ci.write_state_file(state) ``` ## Logging @@ -632,89 +892,105 @@ class Component(ComponentBase): # Sync Actions -From the documentation [Sync actions](https://developers.keboola.com/extend/common-interface/actions/): - -Action provide a way to execute very quick tasks in a single Component, using a single code base. The default -component’s action (`run`) executes as a background, asynchronous job. It is queued, has plenty of execution time, and -there are cases when you might not want to wait for it. Apart from the default `run`, there can be synchronous actions -with limited execution time and you must wait for them. When we refer to **actions**, we mean synchronous actions. Using -actions is fully optional. - -## Use Case - -For example, in our database extractor, the main task (`run` action) is the data extraction itself. But we also want to -be able to test the database credentials and list tables available in the database. These tasks would be very helpful in -the UI. It is not possible to do these things directly in the browser. Setting up a separate component would bring an -overhead of maintaining both the extractor’s Docker image and the new component. - -## Sync Action limitations - -Data is exchanged via `stdout` or `stderr`. - -- Sync actions need to be registered in the Developer Portal first. - -**Following are handled by the decorator automatically** - -- All success responses have to output valid JSON string. Meaning nothing can log into the stdout during the action - execution -- For success action the output needs to be always `{"status":"success"}` in stdout. +[Sync actions](https://developers.keboola.com/extend/common-interface/actions/) provide a way to execute quick, synchronous tasks within a component. Unlike the default `run` action (which executes asynchronously as a background job), sync actions execute immediately and return results directly to the UI. -## Framework Support +Common use cases for sync actions: +- Testing connections to external services +- Fetching dynamic dropdown options for UI configuration +- Validating user input +- Listing available resources (tables, schemas, etc.) -Decorator `sync_action` was added. It takes one parameter `action_name` that will create mapping between the actual -method and the sync action name registered in the Developer Portal. +## Creating Sync Actions -- Decorated methods can also be called from within the program and return values. -- They can log normally -> when run as sync action all logging within the method is muted. -- When a return value is produced, it is expected to be `dict` or `list` object. These will be printed to stdout at the - end. -- Exceptions can be thrown normally and the message will be propagated to the platform. - -### Action output & examples - -Each action has to have specific output based on type of the UI element that the action is triggered with. It can either -have no return value (success / fail type of actions) or UI element specific output. - -For convenience each output variant is represented by classes specified in `keboola.component.sync_actions` module. - -#### ValidationResult - -Result expected by validation button element. +To create a sync action, add a method to your component class and decorate it with `@sync_action('action_name')`. The framework handles all the details of proper response formatting and error handling. ```python from keboola.component.base import ComponentBase, sync_action -from keboola.component.sync_actions import ValidationResult, MessageType - +from keboola.component import UserException class Component(ComponentBase): - def run(self): + # Main component logic pass - - @sync_action('validate_example') - def validate_message(self) -> ValidationResult: - return ValidationResult('Some warning **markdown** message', MessageType.WARNING) + + @sync_action('testConnection') + def test_connection(self): + """ + Tests database connection credentials + """ + params = self.configuration.parameters + connection = params.get('connection', {}) + + # Validate connection parameters + if not connection.get('host') or not connection.get('username'): + raise UserException("Connection failed: Missing host or username") + + # If no exception is raised, the connection test is considered successful + # The framework automatically returns {"status": "success"} ``` -#### SelectElement +## Returning Data from Sync Actions -Element of a dynamic (multi)select UI element. The UI objects expects list of such elements. +Sync actions can return data that is used by the UI, such as dropdown options: ```python from keboola.component.base import ComponentBase, sync_action -from keboola.component.sync_actions import ValidationResult, MessageType, SelectElement -from typing import List - +from keboola.component.sync_actions import SelectElement class Component(ComponentBase): + @sync_action('listTables') + def list_tables(self): + """ + Returns list of available tables for configuration dropdown + """ + # In a real scenario, you would fetch this from a database or API + available_tables = [ + {"id": "customers", "name": "Customer Data"}, + {"id": "orders", "name": "Order History"}, + {"id": "products", "name": "Product Catalog"} + ] + + # Return as list of SelectElement objects for UI dropdown + return [ + SelectElement(value=table["id"], label=table["name"]) + for table in available_tables + ] +``` - def run(self): - pass +## Validation Message Action + +You can provide validation feedback to the UI: - @sync_action('validate_example') - def validate_message(self) -> List[SelectElement]: - return [SelectElement(value="value1", label="Value 1 label"), - SelectElement(value="value2", label="Value 2 label")] +```python +from keboola.component.base import ComponentBase, sync_action +from keboola.component.sync_actions import ValidationResult, MessageType + +class Component(ComponentBase): + @sync_action('validateConfiguration') + def validate_config(self): + """ + Validates the component configuration + """ + params = self.configuration.parameters + + # Check configuration parameters + if params.get('extraction_type') == 'incremental' and not params.get('incremental_key'): + # Return warning message that will be displayed in UI + return ValidationResult( + "Incremental extraction requires specifying an incremental key column.", + MessageType.WARNING + ) + + # Check for potential issues + if params.get('row_limit') and int(params.get('row_limit')) > 1000000: + # Return info message + return ValidationResult( + "Large row limit may cause performance issues.", + MessageType.INFO + ) + + # Success with no message + return None ``` #### No output