Skip to content

twitter_comments: new stream to extract comments #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions source-twitter-fetcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
FROM airbyte/python-connector-base:1.1.0@sha256:dd17e347fbda94f7c3abff539be298a65af2d7fc27a307d89297df1081a45c27

#FROM airbyte/python-connector-base:1.1.0@sha256:dd17e347fbda94f7c3abff539be298a65af2d7fc27a307d89297df1081a45c27
FROM --platform=linux/amd64 airbyte/python-connector-base:1.1.0
COPY . ./airbyte/integration_code

# Force reinstall the correct airbyte-cdk version to avoid conflicts
RUN pip uninstall -y airbyte-cdk
RUN pip install ./airbyte/integration_code

# The entrypoint and default env vars are already set in the base image
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENV AIRBYTE_ENTRYPOINT="python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

15 changes: 14 additions & 1 deletion source-twitter-fetcher/sample_files/config-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,18 @@
"token_expiry_date": ""
},
"account_id": "123456789",
"start_time": "2024-01-01T00:00:00Z"
"start_time": "2024-01-01T00:00:00Z",
"comment_days_limit": 2,
"filtered_author_ids": [
"1417373828544487426",
"1527270456658632706",
"1573349900905054212",
"1636287274961829888",
"1101033576454340608",
"1151831284110385152",
"1083104775825252353",
"774689518767181828",
"1783824207631077376",
"18904639"
]
}
14 changes: 14 additions & 0 deletions source-twitter-fetcher/sample_files/configured_catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "tweet_comments",
"json_schema": {
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object"
},
"supported_sync_modes": [
"full_refresh", "incremental"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "promoted_tweet_active",
Expand Down
2 changes: 1 addition & 1 deletion source-twitter-fetcher/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.2",
"airbyte-cdk>=0.50.0,<0.60.0",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
},
"created_at": {
"type": ["null", "string"]
},
"author_id": {
"type": ["null", "string"]
},
"account_id": {
"type": ["null", "string"]
},
"conversation_id": {
"type": ["null", "string"]
},
"in_reply_to_user_id": {
"type": ["null", "string"]
},
"source": {
"type": ["null", "string"]
},
"referenced_tweets": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties":{
"type": {
"type": ["null", "string"]
},
"id": {
"type": ["null", "string"]
}
}
}
},
"public_metrics": {
"type": ["null", "object"],
"properties": {
"retweet_count": {
"type": ["null", "number"]
},
"reply_count": {
"type": ["null", "number"]
},
"like_count": {
"type": ["null", "number"]
},
"quote_count": {
"type": ["null", "number"]
},
"impression_count": {
"type": ["null", "number"]
},
"bookmark_count": {
"type": ["null", "number"]
}
}
},
"entities": {
"type": ["null", "object"],
"properties": {
"urls": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties": {
"start": {
"type": ["null", "number"]
},
"end": {
"type": ["null", "number"]
},
"url": {
"type": ["null", "string"]
},
"expanded_url": {
"type": ["null", "string"]
},
"display_url": {
"type": ["null", "string"]
}
}
}
},
"hashtags": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties": {
"start": {
"type": ["null", "number"]
},
"end": {
"type": ["null", "number"]
},
"tag": {
"type": ["null", "string"]
}
}
}
},
"mentions": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties": {
"start": {
"type": ["null", "number"]
},
"end": {
"type": ["null", "number"]
},
"username": {
"type": ["null", "string"]
},
"id": {
"type": ["null", "string"]
}
}
}
}
}
},
"context_annotations": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties": {
"domain": {
"type": ["null", "object"],
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"description": {
"type": ["null", "string"]
}
}
},
"entity": {
"type": ["null", "object"],
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"description": {
"type": ["null", "string"]
}
}
}
}
}
}
}
}
10 changes: 10 additions & 0 deletions source-twitter-fetcher/source_twitter_fetcher/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .tweets_stream import Account, Tweet, TweetMetrics, TweetPromoted
from .ads_stream import PromotedTweetActive, PromotedTweetBilling, PromotedTweetEngagement
from .spaces_stream import Space
from .tweets_comments_stream import TweetComments
from .auth import TwitterOAuth

DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Expand Down Expand Up @@ -39,6 +40,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
parent=tweet
)

tweet_comments = TweetComments(
authenticator=auth,
account_id=config['account_id'],
parent=tweet,
comment_days_limit=config.get('comment_days_limit', 2),
filtered_author_ids=config.get('filtered_author_ids', [])
)

promoted_tweet_active = PromotedTweetActive(
authenticator=auth,
account_id=config['account_id'],
Expand Down Expand Up @@ -68,6 +77,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
tweet,
tweet_metrics,
tweet_promoted,
tweet_comments,
promoted_tweet_active,
promoted_tweet_billing,
promoted_tweet_engagement,
Expand Down
14 changes: 14 additions & 0 deletions source-twitter-fetcher/source_twitter_fetcher/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,17 @@ connectionSpecification:
type: string
description: "Start date of fetching data"
format: datetime
comment_days_limit:
type: integer
title: "Comment Days Limit"
description: "Number of days to look back for comments on tweets (default: 2)"
default: 2
minimum: 1
maximum: 7
filtered_author_ids:
type: array
title: "Filtered Author IDs"
description: "List of Twitter author IDs to filter out from comments (e.g., your own organization's account IDs)"
items:
type: string
default: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from typing import Any, Iterable, Mapping, MutableMapping, Optional, List
import logging
import requests
import time
from datetime import datetime, timedelta
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from .tweets_stream import Tweet
import json
import os

logger = logging.getLogger("airbyte")

class TweetComments(HttpSubStream, Tweet):
primary_key = "id"
cursor_field = "created_at"

def __init__(self, comment_days_limit: int = 2, filtered_author_ids: List[str] = None, **kwargs):
super().__init__(**kwargs)
self.comment_days_limit = comment_days_limit
self.limit_date = datetime.now() - timedelta(days=self.comment_days_limit)
# Use provided filtered_author_ids or default to empty list
self.filtered_author_ids = filtered_author_ids or []

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""Handle pagination for Twitter search API"""
response_json = response.json()
if 'meta' in response_json and 'next_token' in response_json['meta'] and response_json['meta'].get('result_count', 0) > 0:
return response_json['meta']['next_token']
return None

def get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any]
) -> MutableMapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with existing state
"""
latest_created_at = latest_record.get(self.cursor_field)
current_state = current_stream_state.get(self.cursor_field)

if current_state is None:
return {self.cursor_field: latest_created_at}

return {self.cursor_field: max(latest_created_at, current_state)}

def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
return "tweets/search/recent"

def request_params(
self,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {
"query": f"conversation_id:{stream_slice['tweet_id']}",
"tweet.fields": "author_id,created_at,conversation_id,in_reply_to_user_id,referenced_tweets,source,text,public_metrics,entities,context_annotations",
"user.fields": "created_at,description,id,name,username",
"expansions": "author_id,referenced_tweets.id",
}
if next_page_token:
params["next_token"] = next_page_token
return params

def stream_slices(
self,
sync_mode: Any,
cursor_field: Optional[List[str]] = None,
stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
# Get tweet IDs from the parent Tweet stream
for parent_slice in super().stream_slices(sync_mode=sync_mode):
tweet = parent_slice["parent"]
yield {"tweet_id": tweet.get('id')}

def parse_response(
self,
response: requests.Response,
stream_slice: Mapping[str, Any] = None,
**kwargs
) -> Iterable[Mapping]:
if 'data' in response.json():
data = response.json()['data']
for tweet in data:
# Skip tweets from filtered author IDs or containing "RT"
if (tweet.get('author_id') not in self.filtered_author_ids and
not tweet.get('text', '').startswith('RT')): #filter out Retweets
# Check if the tweet is within the time limit
tweet_date = datetime.strptime(tweet.get('created_at'), "%Y-%m-%dT%H:%M:%S.%fZ")
if tweet_date >= self.limit_date:
# Add account_id to the tweet data
tweet['account_id'] = self.account_id
yield tweet
# Add rate limiting delay like other Twitter streams
time.sleep(2)
Loading