Skip to content

Commit

Permalink
add scripts to pull the anchor cids down
Browse files Browse the repository at this point in the history
  • Loading branch information
gvelez17 committed Dec 8, 2023
1 parent b0e35b1 commit 7541760
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
61 changes: 61 additions & 0 deletions logs-to-tsdb/backfill/anchors/get-anchor-cids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import subprocess
from datetime import datetime, timedelta
import time
from warnings import warn

def start_query_for_period(log_group, start_date, end_date):
query = "fields @timestamp, @message, @logStream, @log | filter @message LIKE 'Created anchor commit'"
# | parse @message /Created anchor commit with CID (?<@cid>[^ ]+) for stream (?<@streamId>[^']+)/ | display @streamId, @cid"

cmd = [
"aws", "logs", "start-query",
"--log-group-name", log_group,
"--start-time", str(int(start_date.timestamp() * 1000)), # in milliseconds
"--end-time", str(int(end_date.timestamp() * 1000)), # in milliseconds
"--query-string", query,
"--query", "queryId",
"--output", "text"
]

query_id = subprocess.check_output(cmd).decode().strip()
warn("queued query " + query_id)
return query_id

def get_query_results(query_id, filename):
cmd = [
"aws", "logs", "get-query-results",
"--query-id", query_id,
">", filename
]
warn(" ".join(cmd))
subprocess.call(" ".join(cmd), shell=True)

def main():
log_group = "/ecs/ceramic-prod-cas"

start_date_str = "2023-11-08"
end_date_str = "2023-11-30 16"
# end_date_str = "2023-11-30 16"
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d %H")

query_ids = []

# Start all the queries
while start_date < end_date:
next_date = start_date + timedelta(minutes=30)
query_id = start_query_for_period(log_group, start_date, next_date)
query_ids.append((query_id, start_date.strftime('%Y-%m-%d-%H-%M'))) # Storing the date for filename

start_date = next_date

# After starting all the queries, let's wait 20 seconds
time.sleep(20)

# Fetch the results for each query and save to a file
for query_id, date in query_ids:
get_query_results(query_id, f"results_{date}.txt")

if __name__ == "__main__":
main()

5 changes: 5 additions & 0 deletions logs-to-tsdb/backfill/anchors/make-anchor-list.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

for fil in `ls *.txt`
do
python3 munge-anchors.py $fil
done
54 changes: 54 additions & 0 deletions logs-to-tsdb/backfill/anchors/munge-anchors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import re
import os
import sys
import json
import subprocess
from datetime import datetime

"""
{
"results": [
[
{
"field": "@timestamp",
"value": "2023-11-08 09:43:07.657"
},
{
"field": "@message",
"value": "[2023-11-08T09:43:07.656Z] INFO: 'Created anchor commit with CID bafyreiajie4ozkhqzicwygaruxgsit3k4c7grnkppnwx3x55olbb6dofsi for stream k2t6wyfsu4pfyqbahubx5p3ztoxk9phoymxso9ly3rc0bcqk32zxo85v1wor1a'"
},
{
"field": "@logStream",
"value": "cas_anchor/cas_anchor/acfbd097255044a2a8c0de094bc07aea"
},
{
"field": "@log",
"value": "967314784947:/ecs/ceramic-prod-cas"
},
{
"field": "@ptr",
"value": "CmcKJgoiOTY3MzE0Nzg0OTQ3Oi9lY3MvY2VyYW1pYy1wcm9kLWNhcxABEjkaGAIGU2iuegAAAAA44TaoAAZUtX8QAAAAoiABKKbxvvK6MTCV1cTyujE45BJA+6QsSPm4BFCbtQQYACABENYHGAE="
}
],
"""

backfile = sys.argv[1]

PATT = re.compile(r'Created anchor commit with CID (\S+) for stream ([^\']+)')

with open(backfile, 'r') as f:
logs = json.load(f)
for res in logs['results']:
cid = ''
stream = ''
dtstr = ''
for pair in res:
if pair['field'] == '@message':
msg = pair['value']
match = PATT.search(msg)
if match:
cid, stream = match.groups()
if pair['field'] == '@timestamp':
dtstr = pair['value']
print("{},{},{}".format(dtstr, stream, cid))

0 comments on commit 7541760

Please sign in to comment.