pipestat is a library for stat dataset via pipeline, which use mongo aggregation framework syntax. see this mongo aggregation pipeline for how pipeline work.
Here is a quick example to get a feeling of pipestat, extract fields from event, and then stat count and elapse:
>>> from pipestat import pipestat
>>> pipeline = [
... {
... "$match": {
... "_event": {"$regex": "Collect.*timeline\s+end"},
... },
... },
... {
... "$project": {
... "app": {"$extract": ["$_event", "app:(\w*)"]},
... "action": {"$extract": ["$_event", "(cached|refresh|locked)"]},
... "elapse": {
... "$toNumber": {
... "$extract": ["$_event", "elapse:([\d.]*)"],
... },
... }
... },
... },
... {
... "$group": {
... "_id": {
... "app": "$app",
... "action": "$action"
... },
... "count": {"$sum": 1},
... "min_elapse": {"$min": "$elapse"},
... "max_elapse": {"$max": "$elapse"},
... "sum_elapse": {"$sum": "$elapse"},
... }
... },
... {
... "$project": {
... "app": "$_id.app",
... "action": "$_id.action",
... "count": "$count",
... "elapse": {
... "min": "$min_elapse",
... "max": "$max_elapse",
... "avg": {"$divide": ["$sum_elapse", "$count"]},
... },
... },
... },
... {
... "$sort": [
... ("app", 1),
... ("action", 1),
... ]
... },
... ]
>>> dataset = [
... {
... "_event": "[2014-01-16 16:13:49,171] DEBUG Collect app:app37 timeline end... refresh, elapse:1.0",
... },
... {
... "_event": "[2014-01-16 16:13:49,171] DEBUG Collect app:app37 timeline end... refresh, elapse:2.0",
... },
... {
... "_event": "[2014-01-16 16:13:49,171] DEBUG Collect app:app37 timeline end... cached, elapse:0.01",
... },
... {
... "_event": "[2014-01-16 16:13:49,171] DEBUG Collect app:app40 timeline end... refresh, elapse:2.0",
... },
... ]
>>> pipestat(dataset, pipeline)
[
{
"count": 1.0,
"app": "app37",
"action": "cached",
"elapse": {
"min": 0.01,
"max": 0.01,
"avg": 0.01,
}
},
{
"count": 1.0,
"app": "app37",
"action": "refresh",
"elapse": {
"min": 1.0,
"max": 2.0,
"avg": 1.5,
}
},
{
"count": 1.0,
"app": "app40",
"action": "refresh",
"elapse": {
"min": 2.0,
"max": 2.0,
"avg": 2.0,
}
}
]$match pipes the documents that match its conditions to the next operator in the pipeline. See this mongo aggregation $match for more.
- $match command support basic operators:
- $exists, $gt, $gte, $lt, $lte, $ne, $eq, $in, $mod, $and, $or, $nor, $not, $nin, $all, $elemMatch
in addition to this, pipestat $match command support more, like $regex, $call.
$regex operator use regular expression to match specify field value, use like below:
>>> pipeline = [
... {
... "$match": {
... "_event": {"$regex": "Collect.*timeline\s+end"},
... },
... },
... ]$call operator use callable which argument is document, and return True or False to indicate match or not, use like below:
>>> mf = lambda doc: doc["in"] > doc["out"]
>>> pipeline = [
... {
... "$match": {
... "$call": mf,
... },
... },
... ]Reshapes a document stream by renaming, adding, or removing fields. Also use $project to create computed values or sub-documents. Use $project to:
- Include fields from the original document.
- Insert computed fields.
- Rename fields.
- Create and populate fields that hold sub-documents.
See this mongo aggregation $project for more.
- $project command support basic operators:
- $add, $subtract, $multiply, $divide, $mod, $toLower, $toUpper, $substr, $concat and Date operators.
in addition to this, pipestat $project command support more, like $toNumber, $substring, $extract, $timestamp, $use, $call.
$toNumber operator use to convert string to number.
>>> pipeline = [
... {
... "$project": {
... "elapse": {"$toNumber": "$elapse"},
... },
... },
... ]$substring returns a subset of a string between one index and another, or through the end of the string. it support negative index.
>>> pipeline = [
... {
... "$project": {
... "app": {"$substring": ["$app", 3, 5]},
... },
... },
... ]
>>> pipeline = [
... {
... "$project": {
... "app": {"$substring": ["$app", 3]},
... },
... },
... ]$extract operator use to extract field from other field use regular expression, value fetch order is groupdict()["extract"] > group(1) > group(), use like below:
>>> pipeline = [
... {
... "$project": {
... "app": {"$extract": ["$_event", "app:(\w*)"]},
... "action": {"$extract": ["$_event", "(cached|refresh|locked)"]},
... "elapse": {
... "$toNumber": {
... "$extract": ["$_event", "elapse:([\d.]*)"],
... },
... }
... },
... },
... ]$timestamp operator convert formatted string time to seconds float value, use like below:
>>> pipeline = [
... {
... "$project": {
... "ts": {"$timestamp": ["$ts_str", "%Y-%m-%d %H:%M:%S"]},
... },
... },
... ]$use operator used to parse value use predefined parser or any callable, use like below:
>>> pipeline = [
... {
... "$project": {
... "name": {"$use": ["$name", "json"]}, # '{"lastName":"mike","firstName":"kitty"}' ==> {"lastName": "mike", "firstName": "kitty"}
... },
... },
... ]$call operator used for advance purpose if all above cannot satisfy you, use like below:
>>> slot_ts = lambda document: document["ts"] // 300 * 300
>>> pipeline = [
... {
... "$project": {
... "ts": {"$call": slot_ts},
... },
... },
... ]pipestat $project command support combine operator like below:
>>> pipeline = [
... {
... "$project": {
... "traffic": {"$divide": [{"$multiply": ["$traffic", 8]}, 1024]}
... }
... }
... ]Groups documents together for the purpose of calculating aggregate values based on a collection of documents. In practice, $group often supports tasks such as average page views for each page in a website on a daily basis.
See this mongo aggregation $group for more.
in addition to this, pipestat $group command support more, like $concatToSet, $concatToList, $call. see a example as below:
>>> pipeline = [
... {
... "$group": {
... "_id": {
... "app": "$app",
... "action": "$action"
... },
... "count": {"$sum": 1},
... "min_elapse": {"$min": "$elapse"},
... "max_elapse": {"$max": "$elapse"},
... "sum_elapse": {"$sum": "$elapse"},
... }
... },
... ]$concatToSet operator used to merge many list values or single values to one list which without same value.
>>> pipeline = [
... {
... "$group": {
... "_id": {
... "app": "$app",
... },
... "action": {"$concatToSet": "$action"},
... }
... },
... ]$concatToList operator work same with $concatToSet but final list can have same value.
>>> pipeline = [
... {
... "$group": {
... "_id": {
... "app": "$app",
... },
... "action": {"$concatToList": "$action"},
... }
... },
... ]$call operator used for advance purpose if all above cannot satisfy you, $call is very like python built-in reduce function. it's second paramter is accumulate result, initial value is customed undefined.
>>> from pipestat import pipestat, undefined
>>> def filter_concat(document, acc_val):
... if document["action"] != "refresh":
... acc_val.append(document["action"])
... return acc_val
>>> pipeline = [
... {
... "$group": {
... "_id": {
... "app": "$app",
... },
... "action": {
... "$call": filter_concat
... },
... }
... },
... ]the $sort pipeline command sorts all input documents and returns them to the pipeline in sorted order
See this mongo aggregation $sort for more.
$sort command is identical to mongo aggregation $sort, not only use dict, you also can use a list of tuple or collections.OrderedDict, for multi-key sort order reason! see a example as below:
>>> pipeline = [
... {
... "$sort": {"app": 1}
... },
... ]
>>> pipeline = [
... {
... "$sort": [
... ("app", 1),
... ("action", 1),
... ]
... },
... ]Restricts the number of documents that pass through the $limit in the pipeline.
See this mongo aggregation $limit for more.
$limit command is identical to mongo aggregation $limit, see a example as below:
>>> pipeline = [
... {
... "$limit": 3,
... },
... ]Skips over the specified number of documents that pass through the $skip in the pipeline before passing all of the remaining input.
See this mongo aggregation $skip for more.
$skip command is identical to mongo aggregation $skip, see a example as below:
>>> pipeline = [
... {
... "$skip": 3,
... },
... ]Peels off the elements of an array individually, and returns a stream of documents. $unwind returns one document for every member of the unwound array within every source document.
See this mongo aggregation $unwind for more.
$unwind command is identical to mongo aggregation $unwind, see a example as below:
>>> pipeline = [
... {
... "$unwind": "$tags",
... },
... ]