Skip to content

Commit

Permalink
Merge branch 'feature/0.5.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Aug 13, 2014
2 parents f7e47db + 2ed9244 commit 4c64c5a
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 86 deletions.
13 changes: 13 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
Version 0.5.0 (2014-08-13)
--------------------------
Converted payload values to strings for POST requests (#100)
Set content type to "application/json; charset=utf-8" for POST requests (#99)
Changed collector endpoint for POST to /com.snowplowanalytics.snowplow/tp2 (#98)
Stopped setting and sending tid (#94)
Started setting and sending eid (#93)
Allowed a single Tracker instance to send events to multiple Emitters (#91)
Started passing a list of dictionaries to the on_failure callback for POST requests (#104)
Made the "name" argument of track_screen_view optional (#103)
Made all tracker methods chainable (#105)
Stopped sending empty payloads (#106)

Version 0.4.0 (2014-06-10)
--------------------------
Migrated unstructured events to self-describing JSON (#87)
Expand Down
2 changes: 1 addition & 1 deletion snowplow_tracker/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
"""


__version_info__ = (0, 4, 0)
__version_info__ = (0, 5, 0)
__version__ = ".".join(str(x) for x in __version_info__)
52 changes: 28 additions & 24 deletions snowplow_tracker/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

class Emitter(object):
"""
Synchronously send Snowplow events to a Snowplow as_collector_uri
Synchronously send Snowplow events to a Snowplow collector
Supports both GET and POST requests
"""

Expand All @@ -84,7 +84,7 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si
If method is "get": An array of dictionaries corresponding to the unsent events' payloads
:type on_failure: function | None
"""
self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port)
self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port, method)

self.method = method

Expand All @@ -105,7 +105,7 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si

@staticmethod
@contract
def as_collector_uri(endpoint, protocol="http", port=None):
def as_collector_uri(endpoint, protocol="http", port=None, method="get"):
"""
:param endpoint: The raw endpoint provided by the user
:type endpoint: string
Expand All @@ -115,10 +115,14 @@ def as_collector_uri(endpoint, protocol="http", port=None):
:type port: int | None
:rtype: string
"""
if method == "get":
path = "/i"
else:
path = "/com.snowplowanalytics.snowplow/tp2"
if port is None:
return protocol + "://" + endpoint + "/i"
return protocol + "://" + endpoint + path
else:
return protocol + "://" + endpoint + ":" + str(port) + "/i"
return protocol + "://" + endpoint + ":" + str(port) + path

@contract
def input(self, payload):
Expand All @@ -129,29 +133,32 @@ def input(self, payload):
:param payload: The name-value pairs for the event
:type payload: dict(string:*)
"""
self.buffer.append(payload)
if self.method == "post":
self.buffer.append({key: str(payload[key]) for key in payload})
else:
self.buffer.append(payload)

if len(self.buffer) >= self.buffer_size:
return self.flush()
self.flush()

@task(name="Flush")
def flush(self):
"""
Sends all events in the buffer to the collector.
"""
if self.method == "post":
data = json.dumps({
"schema": PAYLOAD_DATA_SCHEMA,
"data": self.buffer
})
buffer_length = len(self.buffer)
self.buffer = []
status_code = self.http_post(data).status_code
if status_code == 200 and self.on_success is not None:
self.on_success(buffer_length)
elif self.on_failure is not None:
self.on_failure(0, data)
return status_code
if self.buffer:
data = json.dumps({
"schema": PAYLOAD_DATA_SCHEMA,
"data": self.buffer
})
temp_buffer = self.buffer
self.buffer = []
status_code = self.http_post(data).status_code
if status_code == 200 and self.on_success is not None:
self.on_success(len(temp_buffer))
elif self.on_failure is not None:
self.on_failure(0, temp_buffer)

elif self.method == "get":
success_count = 0
Expand All @@ -173,8 +180,6 @@ def flush(self):
elif self.on_failure is not None:
self.on_failure(success_count, unsent_requests)

return status_code

else:
logger.warn(self.method + ' is not a recognised HTTP method. Use "get" or "post".')

Expand All @@ -185,7 +190,7 @@ def http_post(self, data):
:type data: string
"""
logger.debug("Sending POST request...")
r = requests.post(self.endpoint, data=data)
r = requests.post(self.endpoint, data=data, headers={'content-type': 'application/json; charset=utf-8'})
logger.info("POST request finished with status code: " + str(r.status_code))
return r

Expand All @@ -209,8 +214,7 @@ def sync_flush(self):
result = Emitter.flush(self)
for t in self.threads:
t.join(THREAD_TIMEOUT)
logger.debug("Finished synchrous flush")
return result
logger.info("Finished synchrous flush")


class AsyncEmitter(Emitter):
Expand Down
43 changes: 28 additions & 15 deletions snowplow_tracker/test/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,23 @@ def fail_response_content(url, request):
class IntegrationTest(unittest.TestCase):

def test_integration_page_view(self):
t = tracker.Tracker(default_emitter, default_subject)
t = tracker.Tracker([default_emitter], default_subject)
with HTTMock(pass_response_content):
t.track_page_view("http://savethearctic.org", "Save The Arctic", "http://referrer.com")
expected_fields = {"e": "pv", "page": "Save+The+Arctic", "url": "http%3A%2F%2Fsavethearctic.org", "refr": "http%3A%2F%2Freferrer.com"}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])

def test_integration_ecommerce_transaction_item(self):
t = tracker.Tracker(default_emitter, default_subject)
t = tracker.Tracker([default_emitter], default_subject)
with HTTMock(pass_response_content):
t.track_ecommerce_transaction_item("12345", "pbz0025", 7.99, 2, "black-tarot", "tarot", currency="GBP")
expected_fields = {"ti_ca": "tarot", "ti_id": "12345", "ti_qu": "2", "ti_sk": "pbz0025", "e": "ti", "ti_nm": "black-tarot", "ti_pr": "7.99", "ti_cu": "GBP"}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])

def test_integration_ecommerce_transaction(self):
t = tracker.Tracker(default_emitter, default_subject)
t = tracker.Tracker([default_emitter], default_subject)
with HTTMock(pass_response_content):
t.track_ecommerce_transaction("6a8078be", 35, city="London", currency="GBP", items=
[{
Expand Down Expand Up @@ -110,23 +110,34 @@ def test_integration_ecommerce_transaction(self):
self.assertEquals(from_querystring("dtm", querystrings[-3]), from_querystring("dtm", querystrings[-2]))

def test_integration_screen_view(self):
t = tracker.Tracker(default_emitter, default_subject)
t = tracker.Tracker([default_emitter], default_subject, encode_base64=False)
with HTTMock(pass_response_content):
t.track_screen_view("Game HUD 2", "Hello!")
t.track_screen_view("Game HUD 2", id_="534")
expected_fields = {"e": "ue"}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])
envelope_string = from_querystring("ue_pr", querystrings[-1])
envelope = json.loads(unquote_plus(envelope_string))
self.assertEquals(envelope, {
"schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
"data": {"schema": "iglu:com.snowplowanalytics.snowplow/screen_view/jsonschema/1-0-0",
"data": {
"name": "Game HUD 2",
"id": "534"
}
}
})

def test_integration_struct_event(self):
t = tracker.Tracker(default_emitter, default_subject)
t = tracker.Tracker([default_emitter], default_subject)
with HTTMock(pass_response_content):
t.track_struct_event("Ecomm", "add-to-basket", "dog-skateboarding-video", "hd", 13.99)
expected_fields = {"se_ca": "Ecomm", "se_pr": "hd", "se_la": "dog-skateboarding-video", "se_va": "13.99", "se_ac": "add-to-basket", "e": "se"}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])

def test_integration_unstruct_event_non_base64(self):
t = tracker.Tracker(default_emitter, default_subject, encode_base64=False)
t = tracker.Tracker([default_emitter], default_subject, encode_base64=False)
with HTTMock(pass_response_content):
t.track_unstruct_event({"schema": "iglu:com.acme/viewed_product/jsonschema/2-0-2", "data": {"product_id": "ASO01043", "price$flt": 49.95, "walrus$tms": 1000}})
expected_fields = {"e": "ue"}
Expand All @@ -140,7 +151,7 @@ def test_integration_unstruct_event_non_base64(self):
})

def test_integration_unstruct_event_base64(self):
t = tracker.Tracker(default_emitter, default_subject, encode_base64=True)
t = tracker.Tracker([default_emitter], default_subject, encode_base64=True)
with HTTMock(pass_response_content):
t.track_unstruct_event({"schema": "iglu:com.acme/viewed_product/jsonschema/2-0-2", "data": {"product_id": "ASO01043", "price$flt": 49.95, "walrus$tms": 1000}})
expected_fields = {"e": "ue"}
Expand All @@ -154,7 +165,7 @@ def test_integration_unstruct_event_base64(self):
})

def test_integration_context_non_base64(self):
t = tracker.Tracker(default_emitter, default_subject, encode_base64=False)
t = tracker.Tracker([default_emitter], default_subject, encode_base64=False)
with HTTMock(pass_response_content):
t.track_page_view("localhost", "local host", None, [{"schema": "iglu:com.example/user/jsonschema/2-0-3", "data": {"user_type": "tester"}}])
envelope_string = from_querystring("co", querystrings[-1])
Expand All @@ -165,7 +176,7 @@ def test_integration_context_non_base64(self):
})

def test_integration_context_base64(self):
t = tracker.Tracker(default_emitter, default_subject, encode_base64=True)
t = tracker.Tracker([default_emitter], default_subject, encode_base64=True)
with HTTMock(pass_response_content):
t.track_page_view("localhost", "local host", None, [{"schema": "iglu:com.example/user/jsonschema/2-0-3", "data": {"user_type": "tester"}}])
envelope_string = unquote_plus(from_querystring("cx", querystrings[-1]))
Expand All @@ -184,26 +195,28 @@ def test_integration_standard_nv_pairs(self):
s.set_timezone("Europe London")
s.set_lang("en")

t = tracker.Tracker(emitters.Emitter("localhost"), s, "cf", app_id="angry-birds-android")
t = tracker.Tracker([emitters.Emitter("localhost")], s, "cf", app_id="angry-birds-android")
with HTTMock(pass_response_content):
t.track_page_view("localhost", "local host")
expected_fields = {"tna": "cf", "res": "100x200",
"lang": "en", "aid": "angry-birds-android", "cd": "24", "tz": "Europe+London",
"p": "mob", "tv": "py-" + _version.__version__}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])
self.assertIsNotNone(from_querystring("eid", querystrings[-1]))
self.assertIsNotNone(from_querystring("dtm", querystrings[-1]))

def test_integration_redis_default(self):
r = redis.StrictRedis()
t = tracker.Tracker(emitters.RedisEmitter(), default_subject)
t = tracker.Tracker([emitters.RedisEmitter()], default_subject)
t.track_page_view("http://www.example.com")
event_string = r.rpop("snowplow")
event_dict = json.loads(event_string.decode("utf-8"))
self.assertEquals(event_dict["e"], "pv")

def test_integration_redis_custom(self):
r = redis.StrictRedis(db=1)
t = tracker.Tracker(emitters.RedisEmitter(rdb=r, key="custom_key"), default_subject)
t = tracker.Tracker([emitters.RedisEmitter(rdb=r, key="custom_key")], default_subject)
t.track_page_view("http://www.example.com")
event_string = r.rpop("custom_key")
event_dict = json.loads(event_string.decode("utf-8"))
Expand All @@ -214,7 +227,7 @@ def test_integration_success_callback(self):
callback_failure_queue = []
callback_emitter = emitters.Emitter("localhost", on_success=lambda x: callback_success_queue.append(x),
on_failure=lambda x, y:callback_failure_queue.append(x))
t = tracker.Tracker(callback_emitter, default_subject)
t = tracker.Tracker([callback_emitter], default_subject)
with HTTMock(pass_response_content):
t.track_page_view("http://www.example.com")
self.assertEquals(callback_success_queue[0], 1)
Expand All @@ -225,7 +238,7 @@ def test_integration_failure_callback(self):
callback_failure_queue = []
callback_emitter = emitters.Emitter("localhost", on_success=lambda x: callback_success_queue.append(x),
on_failure=lambda x, y:callback_failure_queue.append(x))
t = tracker.Tracker(callback_emitter, default_subject)
t = tracker.Tracker([callback_emitter], default_subject)
with HTTMock(fail_response_content):
t.track_page_view("http://www.example.com")
self.assertEquals(callback_success_queue, [])
Expand Down
1 change: 0 additions & 1 deletion snowplow_tracker/test/unit/test_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@


import unittest
import time
from snowplow_tracker import payload


Expand Down
16 changes: 12 additions & 4 deletions snowplow_tracker/test/unit/test_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import unittest
import re
from freezegun import freeze_time
from snowplow_tracker.tracker import Tracker
from snowplow_tracker.emitters import Emitter
Expand All @@ -32,14 +33,14 @@ def setUp(self):
pass

def test_initialisation(self):
t = Tracker(Emitter("d3rkrsqld9gmqf.cloudfront.net"), namespace="cloudfront", encode_base64= False, app_id="AF003")
t = Tracker([Emitter("d3rkrsqld9gmqf.cloudfront.net")], namespace="cloudfront", encode_base64= False, app_id="AF003")
self.assertEquals(t.standard_nv_pairs["tna"], "cloudfront")
self.assertEquals(t.standard_nv_pairs["aid"], "AF003")
self.assertEquals(t.encode_base64, False)

def test_get_transaction_id(self):
tid = Tracker.get_transaction_id()
self.assertTrue(tid >= 100000 and tid <= 999999)
def test_get_uuid(self):
eid = Tracker.get_uuid()
self.assertIsNotNone(re.match('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\Z', eid))

@freeze_time("1970-01-01 00:00:01")
def test_get_timestamp(self):
Expand All @@ -53,3 +54,10 @@ def test_set_timestamp_1(self):
def test_set_timestamp_2(self):
dtm = Tracker.get_timestamp(1399021242240.0303)
self.assertEquals(dtm, 1399021242240)

def test_add_emitter(self):
e1 = Emitter("d3rkrsqld9gmqf.cloudfront.net", method="get")
e2 = Emitter("d3rkrsqld9gmqf.cloudfront.net", method="post")
t = Tracker(e1, namespace="cloudfront", encode_base64= False, app_id="AF003")
t.add_emitter(e2)
self.assertEquals(t.emitters, [e1, e2])
Loading

0 comments on commit 4c64c5a

Please sign in to comment.