Skip to content

Commit

Permalink
Fixes to handling chunked messages from twitter's Streaming API (#104)
Browse files Browse the repository at this point in the history
- HTTP Chunked encoding always uses line feed as a delimiter.
- Multiple tweets can be in a single chunk.

Chunking seems to not split tweets at the right place, this fixes errors seen in my application
  • Loading branch information
pjskennedy authored Nov 11, 2021
1 parent 64c9e87 commit 7a87f09
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 15 deletions.
23 changes: 10 additions & 13 deletions lib/extwitter/api/streaming.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ defmodule ExTwitter.API.Streaming do
end
end

@crlf "\r\n"

@doc false
def process_stream(processor, request_id, configs, acc \\ []) do
receive do
Expand All @@ -139,19 +141,16 @@ defmodule ExTwitter.API.Streaming do

{:http, {request_id, :stream, part}} ->
cond do
is_empty_message(part) ->
send processor, :keepalive
process_stream(processor, request_id, configs, acc)

is_end_of_message(part) ->
message = Enum.reverse([part|acc])
|> Enum.join("")
|> __MODULE__.parse_tweet_message(configs)
if message != nil do
send processor, message
end
process_stream(processor, request_id, configs, [])
# There is a chance of multiple tweets here.
Enum.reverse([part | acc])
|> Enum.join("")
|> String.split(@crlf, trim: true)
|> Enum.map(& __MODULE__.parse_tweet_message(&1, configs))
|> Enum.filter(& !is_nil(&1))
|> Enum.map(& send processor, &1)

process_stream(processor, request_id, configs, [])
true ->
process_stream(processor, request_id, configs, [part|acc])
end
Expand All @@ -171,8 +170,6 @@ defmodule ExTwitter.API.Streaming do
end
end

@crlf "\r\n"
def is_empty_message(part), do: part == @crlf
def is_end_of_message(part), do: part |> String.ends_with?(@crlf)

defp parse_message_type(%{friends: friends}, _) do
Expand Down
81 changes: 79 additions & 2 deletions test/extwitter_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,77 @@ defmodule ExTwitterStreamTest do
:ok
end


test_with_mock "gets Twitter sample stream with multi-chunk message response", ExTwitter.OAuth,
[request_async: fn(_method, _url, _params, _consumer_key, _consumer_secret, _access_token, _access_token_secret) ->
request_id = make_ref()
TestHelper.TestStore.set({self(), request_id})
{:ok, request_id}
end] do

# Process stream on different process.
parent = self()
spawn(fn() ->
stream = ExTwitter.stream_sample
tweet = Enum.take(stream, 1) |> List.first
send parent, {:ok, tweet}
end)

# Send mock data after short wait.
wait_async_request_initialization()
store = TestHelper.TestStore.get
start_stream(store)

# Split tweet into separate parts
middle_index = @mock_tweet_json
|> String.length
|> Kernel./(2)
|> trunc

{first_message, second_message} = String.split_at(@mock_tweet_json, middle_index)

# Send separate messages in two halves
send_part(store, first_message)
send_part(store, second_message)

# Verify result.
receive do
{:ok, tweet} ->
assert tweet.text =~ ~r/sample tweet text/
end
end

test_with_mock "gets Twitter sample stream with line feed in second message", ExTwitter.OAuth,
[request_async: fn(_method, _url, _params, _consumer_key, _consumer_secret, _access_token, _access_token_secret) ->
request_id = make_ref()
TestHelper.TestStore.set({self(), request_id})
{:ok, request_id}
end] do

# Process stream on different process.
parent = self()
spawn(fn() ->
stream = ExTwitter.stream_sample
tweet = Enum.take(stream, 1) |> List.first
send parent, {:ok, tweet}
end)

# Send mock data after short wait.
wait_async_request_initialization()
store = TestHelper.TestStore.get
start_stream(store)

# Send first messages without a line feed, followed by a line feed in a separate message
send_part(store, @mock_tweet_json |> String.replace("\r\n", ""))
send_part(store, "\r\n")

# Verify result.
receive do
{:ok, tweet} ->
assert tweet.text =~ ~r/sample tweet text/
end
end

test_with_mock "gets Twitter sample stream", ExTwitter.OAuth,
[request_async: fn(_method, _url, _params, _consumer_key, _consumer_secret, _access_token, _access_token_secret) ->
request_id = make_ref()
Expand Down Expand Up @@ -131,13 +202,19 @@ defmodule ExTwitterStreamTest do
:timer.sleep(100) # Put small wait for mocking library to become ready.
end

defp send_mock_data({pid, request_id}, json) do
defp start_stream({pid, request_id}) do
headers = [{'connection', 'close'}, {'date', 'Sun, 06 Jul 2014 14:48:13 UTC'},
{'transfer-encoding', 'chunked'}, {'content-type', 'application/json'},
{'x-connection-hash', '4be738c31e867bd602893fb3a320e55e'}]


send pid, {:http, {request_id, :stream_start, headers}}
end

defp send_part({pid, request_id}, json), do:
send pid, {:http, {request_id, :stream, json}}

defp send_mock_data(store, json) do
start_stream(store)
send_part(store, json)
end
end

0 comments on commit 7a87f09

Please sign in to comment.