Skip to content

Commit

Permalink
Merge branch 'feature/0.7.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Aug 7, 2015
2 parents 849f8c6 + 4851bc5 commit a1e4333
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 78 deletions.
13 changes: 13 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
Version 0.7.0 (2015-08-07)
--------------------------
Added SelfDescribingJson class (#140)
Added support for Python 2 unicode strings using six library, thanks @mthomas! (#138)
Started handling RequestExceptions (#134)
Started treating all 2xx and 3xx status codes as successful (#133)
Made Emitter and AsyncEmitter thread-safe (#130)
Made synchronous flush wait until buffer is empty (#139)
Made the number of worker threads used by the AsyncEmitter configurable (#136)
Fixed on_failure implementation for POST requests (#135)
Fixed to latest Peru version (#132)
Fixed code formatting in README (#129)

Version 0.6.0.post1 (2015-02-14)
--------------------------------
Improved logging (#109)
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Contributing quickstart
Assuming Git, Vagrant_ and VirtualBox_ are installed:

::

host$ git clone [email protected]:snowplow/snowplow-python-tracker.git
host$ vagrant up && vagrant ssh
guest$ cd /vagrant
Expand All @@ -64,6 +65,7 @@ Publishing
##########

::

host$ vagrant push

Copyright and license
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ requests==2.2.1
celery==3.1.11
gevent==1.0.0
redis==2.9.1
six==1.9.0
1 change: 1 addition & 0 deletions snowplow_tracker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from snowplow_tracker._version import __version__
from snowplow_tracker.subject import Subject
from snowplow_tracker.emitters import logger, Emitter, AsyncEmitter, CeleryEmitter, RedisEmitter
from snowplow_tracker.self_describing_json import SelfDescribingJson
from snowplow_tracker.tracker import Tracker
from contracts import disable_all as disable_contracts, enable_all as enable_contracts
2 changes: 1 addition & 1 deletion snowplow_tracker/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
"""


__version_info__ = (0, 6, 0, "post1")
__version_info__ = (0, 7, 0)
__version__ = ".".join(str(x) for x in __version_info__)
186 changes: 131 additions & 55 deletions snowplow_tracker/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@
import redis
import logging
from contracts import contract, new_contract
try:
# Python 2
from Queue import Queue
except ImportError:
# Python 3
from queue import Queue

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

DEFAULT_MAX_LENGTH = 10
THREAD_TIMEOUT = 10
PAYLOAD_DATA_SCHEMA = "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-2"

new_contract("protocol", lambda x: x == "http" or x == "https")
Expand All @@ -54,7 +59,6 @@
# Otherwise configure Celery with default settings
app = Celery("Snowplow", broker="redis://guest@localhost//")


class Emitter(object):
"""
Synchronously send Snowplow events to a Snowplow collector
Expand Down Expand Up @@ -99,7 +103,7 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si
self.on_success = on_success
self.on_failure = on_failure

self.threads = []
self.lock = threading.RLock()

logger.info("Emitter initialized with endpoint " + self.endpoint)

Expand Down Expand Up @@ -133,56 +137,23 @@ def input(self, payload):
:param payload: The name-value pairs for the event
:type payload: dict(string:*)
"""
if self.method == "post":
self.buffer.append({key: str(payload[key]) for key in payload})
else:
self.buffer.append(payload)
with self.lock:
if self.method == "post":
self.buffer.append({key: str(payload[key]) for key in payload})
else:
self.buffer.append(payload)

if len(self.buffer) >= self.buffer_size:
self.flush()
if len(self.buffer) >= self.buffer_size:
self.flush()

@task(name="Flush")
def flush(self):
"""
Sends all events in the buffer to the collector.
"""
logger.info("Attempting to send %s requests" % len(self.buffer))
if self.method == "post":
if self.buffer:
data = json.dumps({
"schema": PAYLOAD_DATA_SCHEMA,
"data": self.buffer
}, separators=(',', ':'))
temp_buffer = self.buffer
self.buffer = []
status_code = self.http_post(data).status_code
if status_code == 200 and self.on_success is not None:
self.on_success(len(temp_buffer))
elif self.on_failure is not None:
self.on_failure(0, temp_buffer)

elif self.method == "get":
success_count = 0
unsent_requests = []
status_code = None

while len(self.buffer) > 0:
payload = self.buffer.pop()
status_code = self.http_get(payload).status_code
if status_code == 200:
success_count += 1
else:
unsent_requests.append(payload)

if len(unsent_requests) == 0:
if self.on_success is not None:
self.on_success(success_count)

elif self.on_failure is not None:
self.on_failure(success_count, unsent_requests)

else:
logger.warn(self.method + ' is not a recognised HTTP method. Use "get" or "post".')
with self.lock:
self.send_events(self.buffer)
self.buffer = []

@contract
def http_post(self, data):
Expand All @@ -193,7 +164,7 @@ def http_post(self, data):
logger.info("Sending POST request to %s..." % self.endpoint)
logger.debug("Payload: %s" % data)
r = requests.post(self.endpoint, data=data, headers={'content-type': 'application/json; charset=utf-8'})
getattr(logger, "info" if r.status_code == 200 else "warn")("POST request finished with status code: " + str(r.status_code))
getattr(logger, "info" if self.is_good_status_code(r.status_code) else "warn")("POST request finished with status code: " + str(r.status_code))
return r

@contract
Expand All @@ -205,7 +176,7 @@ def http_get(self, payload):
logger.info("Sending GET request to %s..." % self.endpoint)
logger.debug("Payload: %s" % payload)
r = requests.get(self.endpoint, params=payload)
getattr(logger, "info" if r.status_code == 200 else "warn")("GET request finished with status code: " + str(r.status_code))
getattr(logger, "info" if self.is_good_status_code(r.status_code) else "warn")("GET request finished with status code: " + str(r.status_code))
return r

def sync_flush(self):
Expand All @@ -215,25 +186,130 @@ def sync_flush(self):
"""
logger.debug("Starting synchronous flush...")
result = Emitter.flush(self)
for t in self.threads:
t.join(THREAD_TIMEOUT)
logger.info("Finished synchrous flush")

@staticmethod
@contract
def is_good_status_code(status_code):
"""
:param status_code: HTTP status code
:type status_code: int
:rtype: bool
"""
return 200 <= status_code < 400

@contract
def send_events(self, evts):
"""
:param evts: Array of events to be sent
:type evts: list(dict(string:*))
"""
if len(evts) > 0:
logger.info("Attempting to send %s requests" % len(evts))
if self.method == 'post':
data = json.dumps({
"schema": PAYLOAD_DATA_SCHEMA,
"data": evts
}, separators=(',', ':'))
post_succeeded = False
try:
status_code = self.http_post(data).status_code
post_succeeded = self.is_good_status_code(status_code)
except requests.RequestException as e:
logger.warn(e)
if post_succeeded:
if self.on_success is not None:
self.on_success(len(evts))
elif self.on_failure is not None:
self.on_failure(0, evts)

elif self.method == 'get':
success_count = 0
unsent_requests = []
for evt in evts:
get_succeeded = False
try:
status_code = self.http_get(evt).status_code
get_succeeded = self.is_good_status_code(status_code)
except requests.RequestException as e:
logger.warn(e)
if get_succeeded:
success_count += 1
else:
unsent_requests.append(evt)
if len(unsent_requests) == 0:
if self.on_success is not None:
self.on_success(success_count)
elif self.on_failure is not None:
self.on_failure(success_count, unsent_requests)
else:
logger.info("Skipping flush since buffer is empty")

class AsyncEmitter(Emitter):
"""
Uses threads to send HTTP requests asynchronously
"""

@contract
def __init__(
self,
endpoint,
protocol="http",
port=None,
method="get",
buffer_size=None,
on_success=None,
on_failure=None,
thread_count=1):
"""
:param endpoint: The collector URL. Don't include "http://" - this is done automatically.
:type endpoint: string
:param protocol: The protocol to use - http or https. Defaults to http.
:type protocol: protocol
:param port: The collector port to connect to
:type port: int | None
:param method: The HTTP request method
:type method: method
:param buffer_size: The maximum number of queued events before the buffer is flushed. Default is 10.
:type buffer_size: int | None
:param on_success: Callback executed after every HTTP request in a flush has status code 200
Gets passed the number of events flushed.
:type on_success: function | None
:param on_failure: Callback executed if at least one HTTP request in a flush has status code 200
Gets passed two arguments:
1) The number of events which were successfully sent
2) If method is "post": The unsent data in string form;
If method is "get": An array of dictionaries corresponding to the unsent events' payloads
:type on_failure: function | None
:param thread_count Number of worker threads to use for HTTP requests
:type thread_count int
"""
super(AsyncEmitter, self).__init__(endpoint, protocol, port, method, buffer_size, on_success, on_failure)
self.queue = Queue()
for i in range(thread_count):
t = threading.Thread(target=self.consume)
t.daemon = True
t.start()

def sync_flush(self):
while len(self.buffer) > 0:
self.flush()
self.queue.join()

def flush(self):
"""
Removes all dead threads, then creates a new thread which
excecutes the flush method of the base Emitter class
"""
self.threads = [t for t in self.threads if t.isAlive()]
logger.debug("Flushing thread running...")
t = threading.Thread(target=super(AsyncEmitter, self).flush)
self.threads.append(t)
t.start()
with self.lock:
self.queue.put(self.buffer)
self.buffer = []

def consume(self):
while True:
evts = self.queue.get()
self.send_events(evts)
self.queue.task_done()


class CeleryEmitter(Emitter):
Expand Down
32 changes: 32 additions & 0 deletions snowplow_tracker/self_describing_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
self_describing_json.py
Copyright (c) 2013-2014 Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Apache License Version 2.0,
and you may not use this file except in compliance with the Apache License
Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
http://www.apache.org/licenses/LICENSE-2.0.
Unless required by applicable law or agreed to in writing,
software distributed under the Apache License Version 2.0 is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the Apache License Version 2.0 for the specific
language governing permissions and limitations there under.
Authors: Anuj More, Alex Dean, Fred Blundun
Copyright: Copyright (c) 2013-2014 Snowplow Analytics Ltd
License: Apache License Version 2.0
"""

class SelfDescribingJson(object):

def __init__(self, schema, data):
self.schema = schema
self.data = data

def to_json(self):
return {
"schema": self.schema,
"data": self.data
}
9 changes: 5 additions & 4 deletions snowplow_tracker/test/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import json
import base64
from snowplow_tracker import tracker, _version, emitters, subject
from snowplow_tracker.self_describing_json import SelfDescribingJson
from httmock import all_requests, HTTMock

try:
Expand Down Expand Up @@ -149,7 +150,7 @@ def test_integration_struct_event(self):
def test_integration_unstruct_event_non_base64(self):
t = tracker.Tracker([default_emitter], default_subject, encode_base64=False)
with HTTMock(pass_response_content):
t.track_unstruct_event({"schema": "iglu:com.acme/viewed_product/jsonschema/2-0-2", "data": {"product_id": "ASO01043", "price$flt": 49.95, "walrus$tms": 1000}})
t.track_unstruct_event(SelfDescribingJson("iglu:com.acme/viewed_product/jsonschema/2-0-2", {"product_id": "ASO01043", "price$flt": 49.95, "walrus$tms": 1000}))
expected_fields = {"e": "ue"}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])
Expand All @@ -163,7 +164,7 @@ def test_integration_unstruct_event_non_base64(self):
def test_integration_unstruct_event_base64(self):
t = tracker.Tracker([default_emitter], default_subject, encode_base64=True)
with HTTMock(pass_response_content):
t.track_unstruct_event({"schema": "iglu:com.acme/viewed_product/jsonschema/2-0-2", "data": {"product_id": "ASO01043", "price$flt": 49.95, "walrus$tms": 1000}})
t.track_unstruct_event(SelfDescribingJson("iglu:com.acme/viewed_product/jsonschema/2-0-2", {"product_id": "ASO01043", "price$flt": 49.95, "walrus$tms": 1000}))
expected_fields = {"e": "ue"}
for key in expected_fields:
self.assertEquals(from_querystring(key, querystrings[-1]), expected_fields[key])
Expand All @@ -177,7 +178,7 @@ def test_integration_unstruct_event_base64(self):
def test_integration_context_non_base64(self):
t = tracker.Tracker([default_emitter], default_subject, encode_base64=False)
with HTTMock(pass_response_content):
t.track_page_view("localhost", "local host", None, [{"schema": "iglu:com.example/user/jsonschema/2-0-3", "data": {"user_type": "tester"}}])
t.track_page_view("localhost", "local host", None, [SelfDescribingJson("iglu:com.example/user/jsonschema/2-0-3", {"user_type": "tester"})])
envelope_string = from_querystring("co", querystrings[-1])
envelope = json.loads(unquote_plus(envelope_string))
self.assertEquals(envelope, {
Expand All @@ -188,7 +189,7 @@ def test_integration_context_non_base64(self):
def test_integration_context_base64(self):
t = tracker.Tracker([default_emitter], default_subject, encode_base64=True)
with HTTMock(pass_response_content):
t.track_page_view("localhost", "local host", None, [{"schema": "iglu:com.example/user/jsonschema/2-0-3", "data": {"user_type": "tester"}}])
t.track_page_view("localhost", "local host", None, [SelfDescribingJson("iglu:com.example/user/jsonschema/2-0-3", {"user_type": "tester"})])
envelope_string = unquote_plus(from_querystring("cx", querystrings[-1]))
envelope = json.loads((base64.urlsafe_b64decode(bytearray(envelope_string, "utf-8"))).decode("utf-8"))
self.assertEquals(envelope, {
Expand Down
Loading

0 comments on commit a1e4333

Please sign in to comment.