From 69cc7dc7a4d898c2bd9a08316ab870f3a05efded Mon Sep 17 00:00:00 2001 From: Jason Lubken Date: Tue, 2 Jun 2020 13:58:17 -0400 Subject: [PATCH] Lint --- setup.cfg | 2 +- src/dsdk/mongo.py | 130 +++++++++++++++++++++++++++++++------------- src/dsdk/mssql.py | 50 ++++++++++------- src/dsdk/service.py | 5 -- 4 files changed, 121 insertions(+), 66 deletions(-) diff --git a/setup.cfg b/setup.cfg index 6bc74c7..1e7edba 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,7 +31,7 @@ test = pytest max-complexity = 10 max-line-length = 79 exclude = ci,docs -ignore = C812,C815,C816,D202,D401,E203,W503 +ignore = C812,C815,C816,C819,D202,D401,E203,W503 [matrix] diff --git a/src/dsdk/mongo.py b/src/dsdk/mongo.py index 52c1a52..1667749 100644 --- a/src/dsdk/mongo.py +++ b/src/dsdk/mongo.py @@ -66,11 +66,19 @@ def _inject_mongo_uri(mongo_uri: str) -> str: parser.add( "--mongo-uri", required=True, - help=( - "Mongo URI used to connect to a Mongo database: " - "mongodb://USER:PASS@HOST1,HOST2,.../DATABASE?" - "replicaset=REPLICASET&authsource=admin " - "Url encode all parts: PASS in particular" + help=" ".join( + ( + "Mongo URI used to connect to a Mongo database:", + ( + "mongodb://USER:PASSWORD@HOST1,HOST2,.../DATABASE?" + "replicaset=REPLICASET&authsource=admin" + ), + "Use a valid uri." + "Url encode all parts, but do not encode the entire uri.", + "No unencoded colons, ampersands, slashes,", + "question-marks, etc. in parts.", + "Specifically, check url encoding of PASSWORD.", + ) ), env_var="MONGO_URI", type=_inject_mongo_uri, @@ -86,6 +94,21 @@ def open_mongo(self) -> Generator: class EvidenceMixin(Mixin): """Evidence Mixin.""" + RESULTSET_ERROR = "".join( + ( + "{", + ", ".join( + ( + '"key": "mongo.resultset.error"', + '"collection": "%s.%s"', + '"actual": %s', + '"expected": %s', + ) + ), + "}", + ) + ) + def __init__(self, **kwargs): """__init__.""" super().__init__(**kwargs) @@ -101,21 +124,11 @@ def open_batch( doc = batch.as_insert_doc(model) # <- model dependency with self.open_mongo() as database: key = insert_one(database.batches, doc) - logger.info( - f'"action": "insert", ' - f'"database": "{database.name}", ' - f'"collection": "{database.collection.name}"' - ) yield batch key, doc = batch.as_update_doc() with self.open_mongo() as database: update_one(database.batches, key, doc) - logger.info( - f'"action": "update", ' - f'"database": "{database.name}", ' - f'"collection": "{database.collection.name}"' - ) def store_evidence(self, batch: Batch, *args, **kwargs) -> None: """Store Evidence.""" @@ -129,22 +142,23 @@ def store_evidence(self, batch: Batch, *args, **kwargs) -> None: columns = df[[c for c in df.columns if c not in exclude]] docs = columns.to_dict(orient="records") with self.open_mongo() as database: - result = insert_many(database[key], docs) - assert columns.shape[0] == len(result.inserted_ids), ( - '"action" "insert_many", "database": "%s", "collection": \ - "%s", "message": "columns.shape[0] != \ - len(results.inserted_ids)"' - % (database.name, database.collection.name) + collection = database[key] + result = insert_many(collection, docs) + actual = len(result.inserted.ids) + expected = columns.shape[0] + assert actual == expected, self.RESULTSET_ERROR % ( + database.name, + collection.name, + actual, + expected, ) # TODO: Better exception df.drop(columns=["batch_id"], inplace=True) - logger.info( - f'"action": "insert_many", ' - f'"database": "{database.name}", ' - f'"collection": "{database.collection.name}", ' - f'"count": {len(df.index)}' - ) + + +OPEN = '{"key": "mongo.open", "database": "%s", "is_master": "%s" }' +CLOSE = '{"key": "mongo.close", "database": "%s"}' @contextmanager @@ -171,31 +185,67 @@ def open_database( **kwargs, ) as client: database = client.get_database() - # is_master to force lazy connection open + # force lazy connection open is_master = client.admin.command("ismaster") - logger.debug( - '{"opened_mongo_database: {"name": "%s", "is_master": "%s"}}', - database.name, - is_master, - ) + logger.info(OPEN, database.name, is_master) try: yield database finally: - logger.debug( - '{"close_mongo_database: {"name": "%s"}}', database.name - ) + logger.info(CLOSE, database.name) + + +INSERT_ONE = "".join( + ( + "{", + ", ".join(('"key": "mongo.insert_one"', '"collection": "%s.%s"')), + "}", + ) +) @retry(AutoReconnect) def insert_one(collection: Collection, doc: Dict[str, Any]): """Insert one with retry.""" - return collection.insert_one(doc) + result = collection.insert_one(doc) + logger.info(INSERT_ONE, collection.database.name, collection.name) + return result + + +INSERT_MANY = "".join( + ( + "{", + ", ".join( + ( + '"key": "mongo.insert_many"', + '"collection": "%s.%s"', + '"value": %s', + ) + ), + "}", + ) +) @retry(AutoReconnect) def insert_many(collection: Collection, docs: Sequence[Dict[str, Any]]): """Insert many with retry.""" - return collection.insert_many(docs) + result = collection.insert_many(docs) + logger.info( + INSERT_MANY, + collection.database.name, + collection.name, + len(result.inserted.ids), + ) + return result + + +UPDATE_ONE = "".join( + ( + "{", + ", ".join(('"key": "mongo.update_one"', '"collection": "%s.%s"')), + "}", + ) +) @retry(AutoReconnect) @@ -203,4 +253,6 @@ def update_one( collection: Collection, key: Dict[str, Any], doc: Dict[str, Any] ): """Update one with retry.""" - return collection.update_one(key, doc) + result = collection.update_one(key, doc) + logger.info(UPDATE_ONE, collection.database.name, collection.name) + return result diff --git a/src/dsdk/mssql.py b/src/dsdk/mssql.py index e333d14..fbdef3f 100644 --- a/src/dsdk/mssql.py +++ b/src/dsdk/mssql.py @@ -54,10 +54,20 @@ def _inject_mssql_uri(mssql_uri: str) -> str: parser.add( "--mssql-uri", required=True, - help=( - "MSSQL URI used to connect to a MSSQL database: " - "mssql+pymssql://USER:PASS@HOST:PORT/DATABASE?timeout=TIMEOUT " - "Url encode all parts: USER (domain slash), PASS in particular" + help=" ".join( + ( + "MSSQL URI used to connect to a MSSQL database:", + ( + "mssql+pymssql://USER:PASS@HOST:PORT/DATABASE?" + "timeout=TIMEOUT" + ), + "Use a valid uri." + "Url encode all parts, but do not encode the entire uri.", + "No unencoded colons, ampersands, slashes,", + "question-marks, etc. in parts.", + "Specifically, check url encoding of USER (domain slash)," + "and PASSWORD.", + ) ), env_var="MSSQL_URI", type=_inject_mssql_uri, @@ -82,28 +92,26 @@ class CheckTablePrivileges(Task): # pylint: disable=too-few-public-methods select 1 as n where exists (select 1 as n from {table}) """ - KEY = "table_privilege_check" + KEY = "mssql.table_privilege_check" ON = "".join(("{", f'"key": "{KEY}.on"', "}")) END = "".join(("{", f'"key": "{KEY}.end"', "}")) COLUMN_PRIVILEGE = "".join( - ( - "{", - ", ".join( - (f'"key": "{KEY}.column_privilege_warning"', '"value": "%s"') - ), - "}", - ) + ("{", ", ".join((f'"key": "{KEY}.warn"', '"value": "%s"')), "}",) ) - FAILED = "".join( - ("{", ", ".join((f'"key": "{KEY}.failed"', '"value": "%s"')), "}") + ERROR = "".join( + ("{", ", ".join((f'"key": "{KEY}.table.error"', '"table": "%s"')), "}") ) - FAILURES = "".join( - ("{", ", ".join((f'"key": "{KEY}.failures"', '"value": "%s"')), "}") + ERRORS = "".join( + ( + "{", + ", ".join((f'"key": "{KEY}.tables.error"', '"tables": "%s"')), + "}", + ) ) def __init__(self, tables): @@ -118,7 +126,7 @@ def __call__(self, batch, service): cur = con.execute(self.CONNECT) for _ in cur.fetchall(): pass - failures = [] + errors = [] for table in self.tables: sql = self.EXTANT.format(table=table) try: @@ -131,8 +139,8 @@ def __call__(self, batch, service): if number == 230: logger.info(self.COLUMN_PRIVILEGE, table) continue - logger.warning(self.FAILED, table) - failures.append(table) - if bool(failures): - raise RuntimeError(self.FAILURES, failures) + logger.warning(self.ERROR, table) + errors.append(table) + if bool(errors): + raise RuntimeError(self.ERRORS, errors) logger.info(self.END) diff --git a/src/dsdk/service.py b/src/dsdk/service.py index 01a8907..a5e14ba 100644 --- a/src/dsdk/service.py +++ b/src/dsdk/service.py @@ -120,7 +120,6 @@ def __init__( def __call__(self) -> Batch: """Run.""" - self.check() with self.open_batch() as batch: for task in self.pipeline: task(batch, self) @@ -132,10 +131,6 @@ def __call__(self) -> Batch: ) return batch - def check(self) -> None: - """Check.""" - # TODO add smoke test for each database mixin. - def inject_arguments( # pylint: disable=no-self-use,protected-access self, parser: ArgumentParser ) -> None: