+build-backend = "poetry.core.masonry.api" + +[tool.ruff] +select = [ + "B", # flake8-bugbear + "C", # mccabe + "E", # pycodestyle error + # "ERA", # eradicate + "F", # pyflakes + "N", # pep8-naming + "PL", # pylint + "S", # flake8-bandit + "UP", # pyupgrade + "W", # pycodestyle warning + "I001" # isort +] + +ignore = [ + "C901", # "complexity" category + "PLR", # "refactoring" category has "too many lines in method" type stuff + "PLE1205" # saw this Too many arguments for `logging` format string give a false positive once +] + +line-length = 130 + +# target python 3.10 +target-version = "py310" + +exclude = [ + "migrations" +] + +[tool.ruff.per-file-ignores] +"migrations/versions/*.py" = ["E501"] +"tests/**/*.py" = ["PLR2004", "S101"] # PLR2004 is about magic vars, S101 allows assert + +[tool.ruff.isort] +force-single-line = true + +[tool.mypy] +strict = true +disallow_any_generics = false +warn_unreachable = true +pretty = true +show_column_numbers = true +show_error_codes = true +show_error_context = true diff --git a/src/connector_postgresql/__init__.py b/src/connector_postgresql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/connector_postgresql/base_command.py b/src/connector_postgresql/base_command.py new file mode 100644 index 0000000..67d34ee --- /dev/null +++ b/src/connector_postgresql/base_command.py @@ -0,0 +1,93 @@ +from typing import Any +import json + +import psycopg2 # type: ignore +from spiffworkflow_connector_command.command_interface import CommandErrorDict +from spiffworkflow_connector_command.command_interface import CommandResponseDict +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + + +class ConnectionConfig: + def __init__(self, database: str, host: str, port: int, username: str, password: str): + self.database = database + self.host = host + self.port = port + self.user = username + self.password = password + +class BaseCommand: + def _execute(self, sql: str, conn_str: str, handler: Any) -> ConnectorProxyResponseDict: + conn = None + error: CommandErrorDict | None = None + command_response_body: list | dict | None = None + try: + conn = psycopg2.connect(conn_str) + with conn.cursor() as cursor: + command_response_body = handler(conn, cursor) + if command_response_body is None: + if cursor.rowcount >= 0: + command_response_body = {"result": f"{cursor.rowcount} rows affected"} + else: + command_response_body = {"result": "ok"} + except Exception as e: + error = {"error_code": e.__class__.__name__, "message": f"Error executing sql statement: {str(e)}"} + finally: + if conn is not None: + conn.close() + + command_response: CommandResponseDict = { + "body": json.dumps(command_response_body), + "mimetype": "application/json", + } + return_response: ConnectorProxyResponseDict = { + "command_response": command_response, + "error": error, + "command_response_version": 2, + } + + return return_response + + + def execute_query(self, sql: str, conn_str: str, values: list | None=None) -> ConnectorProxyResponseDict: + def handler(conn: Any, cursor: Any) -> None: + cursor.execute(sql, values) + conn.commit() + + return self._execute(sql, conn_str, handler) + + def execute_batch(self, sql: str, conn_str: str, vars_list: list) -> ConnectorProxyResponseDict: + def handler(conn: Any, cursor: Any) -> None: + cursor.executemany(sql, vars_list) + # TODO: look more into getting this to work instead + # psycopg2.extras.execute_batch(cursor, sql, vars_list) + # https://www.psycopg.org/docs/extras.html#fast-exec + conn.commit() + + return self._execute(sql, conn_str, handler) + + def fetchall(self, sql: str, conn_str: str, values: list) -> ConnectorProxyResponseDict: + def prep_results(results: dict) -> list: + return list(map(list, results)) + def handler(conn: Any, cursor: Any) -> list: + cursor.execute(sql, values) + return prep_results(cursor.fetchall()) + + return self._execute(sql, conn_str, handler) + + def build_where_clause(self, schema: dict[str, Any]) -> tuple[str, Any]: + where_configs = schema.get("where", []) + if len(where_configs) == 0: + return "", None + + operators = {"=", "!=", "<", ">"} + + def build_where_part(where_config: list) -> tuple[str, Any]: + column, operator, value = where_config + if operator not in operators: + raise Exception(f"Unsupported operator '{operator}' in where clause") + return (f"{column} {operator} %s", value) + + where_parts = map(build_where_part, where_configs) + columns, values = zip(*where_parts, strict=True) + + return f"WHERE {' AND '.join(columns)}", values diff --git a/src/connector_postgresql/commands/__init__.py b/src/connector_postgresql/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/connector_postgresql/commands/create_table.py b/src/connector_postgresql/commands/create_table.py new file mode 100644 index 0000000..09f6baf --- /dev/null +++ b/src/connector_postgresql/commands/create_table.py @@ -0,0 +1,37 @@ + +from typing import Any + +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + + +class CreateTable(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + table_name: str, + schema: dict[str, Any] + ): + """__init__.""" + self.database_connection_str = database_connection_str + self.table_name = table_name + self.schema = schema + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + + columns = self._column_definitions(self.schema) + # TODO: build properly with SQL().format(Identifier(name)) + # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries + sql = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({columns});" + + return self.execute_query(sql, self.database_connection_str) + + def _column_definitions(self, schema: dict[str, Any]) -> str: + def column_definition(column: dict) -> str: + return f"{column['name']} {column['type']}" + + column_definitions = map(column_definition, schema["column_definitions"]) + + return ",".join(column_definitions) diff --git a/src/connector_postgresql/commands/delete_values.py b/src/connector_postgresql/commands/delete_values.py new file mode 100644 index 0000000..4e72439 --- /dev/null +++ b/src/connector_postgresql/commands/delete_values.py @@ -0,0 +1,29 @@ + +from typing import Any + +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + + +class DeleteValues(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + table_name: str, + schema: dict[str, Any] + ): + self.database_connection_str = database_connection_str + self.table_name = table_name + self.schema = schema + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + where_clause, values = self.build_where_clause(self.schema) + + # TODO: build properly with SQL().format(Identifier(name)) + # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries + sql = f"DELETE FROM {self.table_name} {where_clause};" # noqa: S608 + + return self.execute_query(sql, self.database_connection_str, values) + diff --git a/src/connector_postgresql/commands/do_sql.py b/src/connector_postgresql/commands/do_sql.py new file mode 100644 index 0000000..70a4e7d --- /dev/null +++ b/src/connector_postgresql/commands/do_sql.py @@ -0,0 +1,32 @@ + +from typing import Any + +from psycopg2.extensions import register_adapter # type: ignore +from psycopg2.extras import Json # type: ignore +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + +register_adapter(dict, Json) + +class DoSQL(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + schema: dict[str, Any] + ): + """__init__.""" + self.database_connection_str = database_connection_str + self.schema = schema + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + + sql = self.schema["sql"] + values = self.schema.get("values", []) + fetch_results = self.schema.get("fetch_results", False) + + if fetch_results: + return self.fetchall(sql, self.database_connection_str, values) + + return self.execute_query(sql, self.database_connection_str, values) diff --git a/src/connector_postgresql/commands/drop_table.py b/src/connector_postgresql/commands/drop_table.py new file mode 100644 index 0000000..72b9e51 --- /dev/null +++ b/src/connector_postgresql/commands/drop_table.py @@ -0,0 +1,26 @@ +from typing import Any + +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + + +class DropTable(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + table_name: str + ): + """__init__.""" + self.database_connection_str = database_connection_str + self.table_name = table_name + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + + # TODO: build properly with SQL().format(Identifier(name)) + # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries + sql = f"DROP TABLE IF EXISTS {self.table_name};" + + return self.execute_query(sql, self.database_connection_str) + diff --git a/src/connector_postgresql/commands/insert_values.py b/src/connector_postgresql/commands/insert_values.py new file mode 100644 index 0000000..2a60a52 --- /dev/null +++ b/src/connector_postgresql/commands/insert_values.py @@ -0,0 +1,31 @@ + +from typing import Any + +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + + +class InsertValues(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + table_name: str, + schema: dict[str, Any] + ): + """__init__.""" + self.database_connection_str = database_connection_str + self.table_name = table_name + self.schema = schema + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + columns = ",".join(self.schema["columns"]) + placeholders = f"({','.join(['%s'] * len(self.schema['columns']))})" + value_lists = self.schema["values"] + + # TODO: build properly with SQL().format(Identifier(name)) + # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries + sql = f"INSERT INTO {self.table_name} ({columns}) VALUES {placeholders};" # noqa: S608 + + return self.execute_batch(sql, self.database_connection_str, value_lists) diff --git a/src/connector_postgresql/commands/select_values.py b/src/connector_postgresql/commands/select_values.py new file mode 100644 index 0000000..ebe6c57 --- /dev/null +++ b/src/connector_postgresql/commands/select_values.py @@ -0,0 +1,31 @@ + +from typing import Any + +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + + +class SelectValues(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + table_name: str, + schema: dict[str, Any] + ): + """__init__.""" + self.database_connection_str = database_connection_str + self.table_name = table_name + self.schema = schema + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + + columns = ",".join(self.schema["columns"]) + where_clause, values = self.build_where_clause(self.schema) + + # TODO: build properly with SQL().format(Identifier(name)) + # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries + sql = f"SELECT {columns} FROM {self.table_name} {where_clause};" # noqa: S608 + + return self.fetchall(sql, self.database_connection_str, values) diff --git a/src/connector_postgresql/commands/update_values.py b/src/connector_postgresql/commands/update_values.py new file mode 100644 index 0000000..ae1fd58 --- /dev/null +++ b/src/connector_postgresql/commands/update_values.py @@ -0,0 +1,41 @@ + +from typing import Any + +from spiffworkflow_connector_command.command_interface import ConnectorCommand +from spiffworkflow_connector_command.command_interface import ConnectorProxyResponseDict + +from connector_postgresql.base_command import BaseCommand + + +class UpdateValues(BaseCommand, ConnectorCommand): + + def __init__(self, + database_connection_str: str, + table_name: str, + schema: dict[str, Any] + ): + """__init__.""" + self.database_connection_str = database_connection_str + self.table_name = table_name + self.schema = schema + + def execute(self, _config: Any, _task_data: Any) -> ConnectorProxyResponseDict: + set_clause, values = self._build_set_clause(self.schema) + where_clause, where_values = self.build_where_clause(self.schema) + + if where_values is not None: + values += where_values + + # TODO: build properly with SQL().format(Identifier(name)) + # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries + sql = f"UPDATE {self.table_name} {set_clause} {where_clause};" + + return self.execute_query(sql, self.database_connection_str, values) + + def _build_set_clause(self, schema: dict[str, Any]) -> tuple[str, Any]: + columns_to_values = schema["set"] + columns, values = zip(*columns_to_values.items(), strict=True) + set_columns = ", ".join(f"{c} = %s" for c in columns) + + return f"SET {set_columns}", values +