Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion mc_providers/onlinenews.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class Overview(TypedDict):
toplangs: Counts # from _format_counts
dailycounts: Counts # from _format_day_counts

SourcesByDate: TypeAlias = dict[str, Counts]

class OnlineNewsAbstractProvider(ContentProvider):
"""
All these endpoints accept `domains: List[str]`
Expand Down Expand Up @@ -397,7 +399,7 @@ def match_formatted_search_strings(fuss: list[str]) -> str:
import json
import time
from enum import Enum
from typing import TypeAlias
from typing import TypeAlias, cast

import elasticsearch
from elasticsearch_dsl import Search, Response
Expand Down Expand Up @@ -1376,3 +1378,83 @@ def fields(cls, expanded: bool = False) -> list[str]:
if (f.include == Include.DEFAULT or
(expanded and f.include == Include.EXPANDED))
]

#@CachingManager.cache() # enable if used for user-facing functions!!
def sources_by_date(self, query: str,
start_date: dt.datetime, end_date: dt.datetime,
max_domains: Optional[int] = None,
interval: Optional[str] = None,
date_extras: dict[str, Any] = {},
**kwargs: Any) -> SourcesByDate:
"""
created for Media Cloud internal directory management!
Returns dictionary indexed by date of dicts indexed by domain of counts
"""
AGG_DATE = 'date'
AGG_SRCS = 'srcs'

date_delta = end_date - start_date

# if no interval supplied, pick one
# (maybe truncate date range to bucket boundaries?)
if interval is None:
if date_delta.days <= 31:
interval = "day"
elif date_delta.days <= 2*365+1:
interval = "month"
else:
interval = "year"

# num_date_buckets the number of date buckets based on date range
# by default (cluster setting), max total buckets is 65536.
# NOTE!!! Assumes start date is bucket aligned
# (first of month, start day of week, start of year)
num_date_buckets: int
if interval == "day":
date_format = "%Y-%m-%d"
num_dates = date_delta.days
if interval == "week":
# supplied start/end dates should be first and last days of weeks!
# NOTE! By default start buckets w/ monday dates.
# Supply date_extras={"offset": "-1d"} for Sunday.
# Partial weeks before or after will create extra buckets!!
date_format = "%Y-%m-%d"
num_dates = int(date_delta.days / 7 + 0.9) # WRONG!
elif interval == "month":
# supplied start/end dates should be first and last days of months!
date_format = "%Y-%m"
num_dates = ((end_date.year - start_date.year) * 12 +
(end_date.month - start_date.month + 1))
elif interval == "year":
# supplied start/end dates should be first and last days of year!!
date_format = "%Y"
num_dates = end_date.year - start_date.year + 1
else:
raise ValueError(f"unknown interval {interval}")

if max_domains is None:
# use reduced total bucket count to allow for
# non-bucket-aligned dates creating extra buckets?
max_domains = 60000 // num_dates

search = self._basic_search(query, start_date, end_date, **kwargs)\
.extra(size=0) # just aggs
# nested buckets!
search.aggs.bucket(AGG_DATE, "date_histogram",
field="publication_date",
calendar_interval=interval,
**date_extras)\
.bucket(AGG_SRCS, "terms",
field="canonical_domain",
size=max_domains)
res = self._search(search, "src_by_date")
ret: SourcesByDate = {}
date_buckets = cast(list[dict[str, Any]], res.aggregations[AGG_DATE])
for date in date_buckets:
# key is epoch UTC milliseconds
tm = time.gmtime(int(date['key']) // 1000)
fdate = time.strftime(date_format, tm) # formatted date
ret[fdate] = {str(b["key"]): int(b["doc_count"])
for b in date[AGG_SRCS]['buckets']}
# maybe return top level TypedDict with totals (if any) returned with query result?
return ret