Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large messages get corrupted or never arrive #48

Open
seeseemelk opened this issue Jul 14, 2024 · 14 comments
Open

Large messages get corrupted or never arrive #48

seeseemelk opened this issue Jul 14, 2024 · 14 comments

Comments

@seeseemelk
Copy link

When large messages are received (I'm testing with a payload of 2625000 bytes), the library starts to misbehave.

Sometimes it hangs forever waiting for the actual message data to come in. Other times it'll think that the payload is part of the topic name. Sometimes it will not read the entire message, and think that a part of the payload is the header for the next message.

I noticed that increasing the socket timeout seems to largely resolve/workaround the issue.

@seeseemelk
Copy link
Author

I managed to get a more stable fix by doing chunked reads.
Instead of reading the entire 2.5 megabytes in one go, I changed the timeout version of conn.recv_func to only do reads of up to 16k. If more data is request, multiple reads are done.

This allows the function to still yield periodically, while also being able to read a large packet.

I noticed that large sends also have issues. For this I simply disable the luasocket timeout before the send, and enable it again after. Not ideal, but function for my use case.

@Tieske
Copy link
Contributor

Tieske commented Jul 14, 2024

Can you provide a test that shows the behaviour? or a minimal reproducing example?

@oktoberfest6
Copy link

Hello,

I also observed the same phenomenon. Using the script below I have three different behaviors:

  • the script receives the payload and displays "received message #msg = 200000"
  • the script hangs
  • the scripts crashes with the following stack:
    D:\temp\testmqtt>lua.exe testmqtt.lua
    lua.exe: testmqtt.lua:7: failed to send PUBLISH: connector.send failed: timeout
    stack traceback:
    [C]: in function 'assert'
    testmqtt.lua:7: in upvalue 'callback'
    D:\temp\testmqtt/modules/luarocks/share/mqtt\client.lua:347: in local 'handler'
    D:\temp\testmqtt/modules/luarocks/share/mqtt\client.lua:849: in method 'handle'
    D:\temp\testmqtt/modules/luarocks/share/mqtt\client.lua:1010: in method '_io_iteration'
    D:\temp\testmqtt/modules/luarocks/share/mqtt\client.lua:902: in method '_ioloop_iteration'
    D:\temp\testmqtt/modules/luarocks/share/mqtt\ioloop.lua:113: in method 'iteration'
    D:\temp\testmqtt/modules/luarocks/share/mqtt\ioloop.lua:139: in function <D:\temp\testmqtt/modules/luarocks/share/mqtt\ioloop.lua:136>
    (...tail calls...)
    testmqtt.lua:21: in main chunk
    [C]: in ?
local mqtt = require("mqtt")

local client = mqtt.client{ uri = os.getenv("MQTTBROKER"), clean = true }

local function onsubscribe()
	local payload = string.rep('a', 200000)
	assert(client:publish{ topic = "luamqtt", payload = payload })
end

local function onconnect()
	assert(client:subscribe{ topic="luamqtt", qos=1, callback = onsubscribe })
end

local function onmessage(msg)
	assert(client:acknowledge(msg))
	print("received message #msg =", #msg.payload)
	client:disconnect()
end

client:on{ connect = onconnect, message = onmessage }
mqtt.run_ioloop(client)

@Tieske
Copy link
Contributor

Tieske commented Jul 20, 2024

@oktoberfest6 thx for the test code. I tried to take this into account when I rewrote the connectors in this branch: #31 But your test code equally failed. I have now fixed that (last 2 commits I just added).

So if you need this, use that branch.

@xHasKx any chance we can merge that branch? I'd love to drop my fork in favour of this original library, but it needs those fixes.

@Tieske
Copy link
Contributor

Tieske commented Jul 20, 2024

@seeseemelk since you also reported the same issue when sending, would you mind giving #31 a try, it should fix that as well, since it caters for partial sends.

@seeseemelk
Copy link
Author

Just tried it, but it's still broken in interesting ways.

I've uploaded the code I'm using to test it here: https://github.com/seeseemelk/luamqtt-failure-example

test-image.lua sends a payload of the ASCII character 'A' 537726 times, get-image.lua receives it and prints out the length.

To test, start get-image.lua, then run test-image.lua repeatedly.
Sometimes test-image.lua will hang without anything ever arriving at get-image.lua, other times get-image.lua will print out with some error message. Varying the length of the payload changes the error message sometimes.

$ lua get-image.lua 
Connected
Subscribed
Wrote image 1 of length 525310
MQTT client error:	failed to receive next packet: PUBREL: unexpected flags value: 1
Connected
Subscribed

@seeseemelk
Copy link
Author

I'm using a Mosquitto configured to allowed anonymous access as a broker.

@seeseemelk
Copy link
Author

Just did a quick test, LuaSocket will sometimes return both a timeout and some data as part of a third return value containing the data that could be read within the time.
Next time one calls the receive function, the data that was partially read the last time is not returned again.

For example, take the following bit of code:

local socket = require("socket")

local conn = socket.connect("localhost", 1234)
print("connected")

conn:settimeout(3)

local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)


local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)

conn:close()

This can in some circumstances return the following result (tested by running netcat in a separate terminal and pressing a key followed by enter twice):

connected
Data:	nil
Err:	timeout
Part:	e

Data:	nil
Err:	timeout
Part:	f

The byte e that was returned in the first call was not returned in the second call. In other words, when a timeout occurs and some data was received, this data gets dropped.

The receive function does take a second argument, a data prefix. If you pass the partial data of the last call to this function, it will behave as expected.

local socket = require("socket")

local conn = socket.connect("localhost", 1234)
print("connected")

conn:settimeout(3)

local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)


local data, err, part2 = conn:receive(3, part)
print("Data:", data)
print("Err:", err)
print("Part:", part2)

conn:close()

Will result in

connected
Data:	nil
Err:	timeout
Part:	e

Data:	e
f
Err:	nil
Part:	nil

Reading the luamqtt code, it does not seem that this behaviour is being taken into account.

@seeseemelk
Copy link
Author

seeseemelk commented Jul 20, 2024

And indeed, changing the luasocket:plain_receive in @Tieske's branch from

local data, err, partial = sock:receive(size)
if data then
    return data
end

to

local data, err, partial = sock:receive(size, self.sockPartialData)
self.sockPartialData = partial
if data then
    return data
end

seems to fix the issue for me.

@seeseemelk
Copy link
Author

I have not checked the behaviour of the send function, but reading the documentation of it and comparing it to the receive function I think @Tieske's send function is sound.

@Tieske
Copy link
Contributor

Tieske commented Jul 20, 2024

@seeseemelk

Just did a quick test, LuaSocket will sometimes return both a timeout and some data as part of a third return value containing the data that could be read within the time.

The fix I created only lives in my keepalive branch, which is in PR #31 . If you used the code from my fork, then ensure you use the keepalive branch and neither main nor master in that case.

Here's the fix I added: https://github.com/xHasKx/luamqtt/pull/31/files#diff-d0c02746691eb0a97d87327874dc1658dfb7946448270d9e298e419b12f0897dR121-R126 which is in line with your reported fix (but as mentioned that is NOT in my main and master branches)

@Tieske
Copy link
Contributor

Tieske commented Jul 20, 2024

fyi; I've merged the fix into my main branch and released version t1.0.1 of my fork, available on LuaRocks.

install via luarocks install luamqttt (note the extra t, and there can only be one of luamqttt and luamqtt be installed at the same time, since they use the same filenames).

@seeseemelk
Copy link
Author

Indeed, I was using the main branch instead of the keepalive branch. The latter branch does indeed work, as does t1.0.1.
Dankuwel!

@oktoberfest6
Copy link

@Tieske: thanks for the fix!

I also noticed that you implemented (since 1.0.0) signal_idle and signal_code, which are a nice way to detect timeouts when using the module with external connectors.

I hope all these great improvements will be merged into the original library

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants