Skip to content

Commit

Permalink
test: Add tests for CTX manager usage on PEP0249.
Browse files Browse the repository at this point in the history
Add the test_connect_ctx_mngr UT to test the span creation when
using the psycopg2 connect with context manager, and the
test_connect_cursor_ctx_mngr UT when using the psycopg2 connect
and cursos context manager.

Signed-off-by: Paulo Vital <[email protected]>
  • Loading branch information
pvital committed Aug 31, 2023
1 parent 4b2d90b commit ccf7ce0
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 0 deletions.
90 changes: 90 additions & 0 deletions tests/clients/test_mysqlclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,93 @@ def test_error_capture(self):
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from blah')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_ctx_mngr(self):
result = None

with tracer.start_active_span('test'):
with self.db as conn:
cursor = conn.cursor()
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_cursor_ctx_mngr(self):
result = None

with tracer.start_active_span('test'):
with self.db.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()
self.db.close()

assert(result >= 0)

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_cursor_ctx_mngr(self):
result = None

with tracer.start_active_span('test'):
with self.db as conn:
with conn.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])
77 changes: 77 additions & 0 deletions tests/clients/test_psycopg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,80 @@ def test_register_type(self):
ext.register_type(ext.UUID, self.cursor)
ext.register_type(ext.UUIDARRAY, self.cursor)

def test_connect_ctx_mngr(self):
with tracer.start_active_span('test'):
with self.db as conn:
cursor = conn.cursor()
cursor.execute("""SELECT * from users""")
cursor.fetchone()

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv['postgresql_db'])
self.assertEqual(db_span.data["pg"]["user"], testenv['postgresql_user'])
self.assertEqual(db_span.data["pg"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["pg"]["host"], testenv['postgresql_host'])
self.assertEqual(db_span.data["pg"]["port"], testenv['postgresql_port'])

def test_cursor_ctx_mngr(self):
with tracer.start_active_span('test'):
with self.db.cursor() as cursor:
cursor.execute("""SELECT * from users""")
cursor.fetchone()
self.db.close()

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv['postgresql_db'])
self.assertEqual(db_span.data["pg"]["user"], testenv['postgresql_user'])
self.assertEqual(db_span.data["pg"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["pg"]["host"], testenv['postgresql_host'])
self.assertEqual(db_span.data["pg"]["port"], testenv['postgresql_port'])

def test_connect_cursor_ctx_mngr(self):
with tracer.start_active_span('test'):
with self.db as conn:
with conn.cursor() as cursor:
cursor.execute("""SELECT * from users""")
cursor.fetchone()

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv['postgresql_db'])
self.assertEqual(db_span.data["pg"]["user"], testenv['postgresql_user'])
self.assertEqual(db_span.data["pg"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["pg"]["host"], testenv['postgresql_host'])
self.assertEqual(db_span.data["pg"]["port"], testenv['postgresql_port'])
90 changes: 90 additions & 0 deletions tests/clients/test_pymysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,93 @@ def test_error_capture(self):
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from blah')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_ctx_mngr(self):
result = None

with tracer.start_active_span('test'):
with self.db as conn:
cursor = conn.cursor()
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_cursor_ctx_mngr(self):
result = None

with tracer.start_active_span('test'):
with self.db.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()
self.db.close()

assert(result >= 0)

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_cursor_ctx_mngr(self):
result = None

with tracer.start_active_span('test'):
with self.db as conn:
with conn.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)

self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

0 comments on commit ccf7ce0

Please sign in to comment.