Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add tests for CTX manager usage on PEP0249. #448

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions tests/clients/test_mysqlclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,93 @@ def test_error_capture(self):
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from blah')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_ctx_mngr(self):
result = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context manager does not create a new scope, the result = cursor.execute( statement is enough. Please remove this line, luckily this is not Pascal or ANSI C, where you have to declare the variables way ahead of assigning them.


with tracer.start_active_span('test'):
with self.db as conn:
cursor = conn.cursor()
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is for catching runtime anomalies in the production code, after it is deployed.
For unit testing we should use assertTrue, if nothing else fits better.


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_cursor_ctx_mngr(self):
result = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.


with tracer.start_active_span('test'):
with self.db.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()
self.db.close()

assert(result >= 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is for catching runtime anomalies in the production code, after it is deployed.
For unit testing we should use assertTrue, if nothing else fits better.


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_cursor_ctx_mngr(self):
result = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.


with tracer.start_active_span('test'):
with self.db as conn:
with conn.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is for catching runtime anomalies in the production code, after it is deployed.
For unit testing we should use assertTrue, if nothing else fits better.


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])
77 changes: 77 additions & 0 deletions tests/clients/test_psycopg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,80 @@ def test_register_type(self):
ext.register_type(ext.UUID, self.cursor)
ext.register_type(ext.UUIDARRAY, self.cursor)

def test_connect_ctx_mngr(self):
with tracer.start_active_span('test'):
with self.db as conn:
cursor = conn.cursor()
cursor.execute("""SELECT * from users""")
cursor.fetchone()

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv['postgresql_db'])
self.assertEqual(db_span.data["pg"]["user"], testenv['postgresql_user'])
self.assertEqual(db_span.data["pg"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["pg"]["host"], testenv['postgresql_host'])
self.assertEqual(db_span.data["pg"]["port"], testenv['postgresql_port'])

def test_cursor_ctx_mngr(self):
with tracer.start_active_span('test'):
with self.db.cursor() as cursor:
cursor.execute("""SELECT * from users""")
cursor.fetchone()
self.db.close()

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv['postgresql_db'])
self.assertEqual(db_span.data["pg"]["user"], testenv['postgresql_user'])
self.assertEqual(db_span.data["pg"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["pg"]["host"], testenv['postgresql_host'])
self.assertEqual(db_span.data["pg"]["port"], testenv['postgresql_port'])

def test_connect_cursor_ctx_mngr(self):
with tracer.start_active_span('test'):
with self.db as conn:
with conn.cursor() as cursor:
cursor.execute("""SELECT * from users""")
cursor.fetchone()

spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "postgres")
self.assertEqual(db_span.data["pg"]["db"], testenv['postgresql_db'])
self.assertEqual(db_span.data["pg"]["user"], testenv['postgresql_user'])
self.assertEqual(db_span.data["pg"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["pg"]["host"], testenv['postgresql_host'])
self.assertEqual(db_span.data["pg"]["port"], testenv['postgresql_port'])
90 changes: 90 additions & 0 deletions tests/clients/test_pymysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,93 @@ def test_error_capture(self):
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from blah')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_ctx_mngr(self):
result = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.


with tracer.start_active_span('test'):
with self.db as conn:
cursor = conn.cursor()
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is for catching runtime anomalies in the production code, after it is deployed.
For unit testing we should use assertTrue, if nothing else fits better.


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_cursor_ctx_mngr(self):
result = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.


with tracer.start_active_span('test'):
with self.db.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()
self.db.close()

assert(result >= 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is for catching runtime anomalies in the production code, after it is deployed.
For unit testing we should use assertTrue, if nothing else fits better.


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])

def test_connect_cursor_ctx_mngr(self):
result = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove.


with tracer.start_active_span('test'):
with self.db as conn:
with conn.cursor() as cursor:
result = cursor.execute("""SELECT * from users""")
cursor.fetchone()

assert(result >= 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is for catching runtime anomalies in the production code, after it is deployed.
For unit testing we should use assertTrue, if nothing else fits better.


spans = self.recorder.queued_spans()
self.assertEqual(2, len(spans))

db_span = spans[0]
test_span = spans[1]

self.assertEqual("test", test_span.data["sdk"]["name"])
self.assertEqual(test_span.t, db_span.t)
self.assertEqual(db_span.p, test_span.s)

self.assertEqual(None, db_span.ec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be assertIsNone


self.assertEqual(db_span.n, "mysql")
self.assertEqual(db_span.data["mysql"]["db"], testenv['mysql_db'])
self.assertEqual(db_span.data["mysql"]["user"], testenv['mysql_user'])
self.assertEqual(db_span.data["mysql"]["stmt"], 'SELECT * from users')
self.assertEqual(db_span.data["mysql"]["host"], testenv['mysql_host'])
self.assertEqual(db_span.data["mysql"]["port"], testenv['mysql_port'])