Skip to content

Commit

Permalink
Initial shadow integration (#12)
Browse files Browse the repository at this point in the history
* Initial shadow integration

* Version bump
  • Loading branch information
Octogonapus authored May 23, 2022
1 parent f0aff34 commit f68bb2c
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "AWSCRT"
uuid = "df31ea59-17a4-4ebd-9d69-4f45266dc2c7"
version = "0.1.0"
version = "0.1.1"

[deps]
AWSCRT_jll = "01db5350-6ea1-5d9a-9a47-8a31a394cb9c"
Expand Down
3 changes: 3 additions & 0 deletions src/AWSCRT.jl
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,7 @@ export unsubscribe
export resubscribe_existing_topics
export publish

include("IOTShadow.jl")
export ShadowClient

end
88 changes: 49 additions & 39 deletions src/AWSMQTT.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
const subscribe_callback_docs = "Callback invoked when message received. See [`OnMessage`](@ref) for the required signature."
const subscribe_qos_docs = "Maximum requested QoS that server may use when sending messages to the client. The server may grant a lower QoS in the SUBACK (see returned task)."
const subscribe_return_docs = """Returns a task and the ID of the SUBSCRIBE packet.
The task completes when a SUBACK is received from the server.
If successful, the task will contain a dict with the following members:
- `:packet_id (Int)`: ID of the SUBSCRIBE packet being acknowledged.
- `:topic (String)`: Topic filter of the SUBSCRIBE packet being acknowledged.
- `:qos (aws_mqtt_qos)`: Maximum QoS that was granted by the server. This may be lower than the requested QoS.
If unsuccessful, the task contains an exception."""


"""
MQTTClient(
tls_ctx::Union{ClientTLSContext,Nothing},
Expand Down Expand Up @@ -76,7 +89,7 @@ Arguments:
- `topic (String)`: Topic receiving message.
- `payload (String)`: Payload of message.
- `dup (Bool)`: DUP flag. If True, this might be re-delivery of an earlier attempt to send the message.
- `qos (aws_mqtt_qos)`: Quality of Service used to deliver the message.
- `qos (aws_mqtt_qos)`: $subscribe_qos_docs
- `retain (Bool)`: Retain flag. If `true`, the message was sent as a result of a new subscription being made by the client.
Returns `nothing`.
Expand Down Expand Up @@ -561,20 +574,12 @@ Once subscribed, `callback` is invoked each time a message matching the `topic`
possible for such messages to arrive before the SUBACK is received.
Arguments:
- `connection`: Connection to use.
- `topic`: Subscribe to this topic filter, which may include wildcards.
- `qos`: Maximum requested QoS that server may use when sending messages to the client. The server may grant a lower QoS in the SUBACK (see returned task).
- `callback`: Optional callback invoked when message received. See [`OnMessage`](@ref) for the required signature.
Returns a task and the ID of the SUBSCRIBE packet.
The task completes when a SUBACK is received from the server.
If successful, the task will contain a dict with the following members:
- `:packet_id (Int)`: ID of the SUBSCRIBE packet being acknowledged.
- `:topic (String)`: Topic filter of the SUBSCRIBE packet being acknowledged.
- `:qos (aws_mqtt_qos)`: Maximum QoS that was granted by the server. This may be lower than the requested QoS.
- `connection (MQTTConnection)`: Connection to use.
- `topic (String)`: Subscribe to this topic filter, which may include wildcards.
- `qos (aws_mqtt_qos)`: $subscribe_qos_docs
- `callback (OnMessage)`: $subscribe_callback_docs
If unsuccessful, the task contains an exception.
$subscribe_return_docs
"""
function subscribe(connection::MQTTConnection, topic::String, qos::aws_mqtt_qos, callback::OnMessage)
on_message_fcb = ForeignCallbacks.ForeignCallback{OnMessageMsg}() do msg
Expand Down Expand Up @@ -653,8 +658,8 @@ end
Set callback to be invoked when ANY message is received.
Arguments:
- `connection`: Connection to use.
- `callback`: Optional callback invoked when message received. See [`OnMessage`](@ref) for the required signature. Set to `nothing` to clear this callback.
- `connection (MQTTConnection)`: Connection to use.
- `callback (Union{OnMessage,Nothing})`: Optional callback invoked when message received. See [`OnMessage`](@ref) for the required signature. Set to `nothing` to clear this callback.
Returns nothing.
"""
Expand Down Expand Up @@ -728,23 +733,25 @@ function on_unsubscribe_complete(
return nothing
end

const unsubscribe_return_docs = """Returns a task and the ID of the UNSUBSCRIBE packet.
The task completes when an UNSUBACK is received from the server.
If successful, the task will contain a dict with the following members:
- `:packet_id (Int)`: ID of the UNSUBSCRIBE packet being acknowledged.
If unsuccessful, the task will throw an exception."""

"""
unsubscribe(connection::MQTTConnection, topic::String)
Unsubscribe from a topic filter (async).
The client sends an UNSUBSCRIBE packet, and the server responds with an UNSUBACK.
Arguments:
- `connection`: Connection to use.
- `topic`: Unsubscribe from this topic filter.
Returns a task and the ID of the UNSUBSCRIBE packet.
The task completes when an UNSUBACK is received from the server.
If successful, the task will contain a dict with the following members:
- `:packet_id (Int)`: ID of the UNSUBSCRIBE packet being acknowledged.
- `connection (MQTTConnection)`: Connection to use.
- `topic (String)`: Unsubscribe from this topic filter.
If unsuccessful, the task will throw an exception.
$unsubscribe_return_docs
"""
function unsubscribe(connection::MQTTConnection, topic::String)
ch = Channel(1)
Expand Down Expand Up @@ -978,19 +985,7 @@ function on_publish_complete(
return nothing
end

"""
publish(connection::MQTTConnection, topic::String, payload::String, qos::aws_mqtt_qos, retain::Bool = false)
Publish message (async).
If the device is offline, the PUBLISH packet will be sent once the connection resumes.
Arguments:
- `connection`: Connection to use.
- `topic`: Topic name.
- `payload`: Contents of message.
- `qos`: Quality of Service for delivering this message.
- `retain`: If `true`, the server will store the message and its QoS so that it can be delivered to future subscribers whose subscriptions match its topic name.
const publish_return_docs = """
Returns a task and the ID of the PUBLISH packet.
The QoS determines when the task completes:
- For QoS 0, completes as soon as the packet is sent.
Expand All @@ -1000,7 +995,22 @@ The QoS determines when the task completes:
If successful, the task will contain a dict with the following members:
- `:packet_id (Int)`: ID of the PUBLISH packet that is complete.
If unsuccessful, the task will throw an exception.
If unsuccessful, the task will throw an exception."""

"""
publish(connection::MQTTConnection, topic::String, payload::String, qos::aws_mqtt_qos, retain::Bool = false)
Publish message (async).
If the device is offline, the PUBLISH packet will be sent once the connection resumes.
Arguments:
- `connection (MQTTConnection)`: Connection to use.
- `topic (String)`: Topic name.
- `payload (String)`: Contents of message.
- `qos (aws_mqtt_qos)`: $subscribe_qos_docs
- `retain (Bool)`: If `true`, the server will store the message and its QoS so that it can be delivered to future subscribers whose subscriptions match its topic name.
$publish_return_docs
"""
function publish(connection::MQTTConnection, topic::String, payload::String, qos::aws_mqtt_qos, retain::Bool = false)
ch = Channel(1)
Expand Down
123 changes: 123 additions & 0 deletions src/IOTShadow.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
ShadowClient(
connection::MQTTConnection,
)
Device Shadow service client.
[AWS Documentation](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html).
Arguments:
- `connection (MQTTConnection)`: MQTT connection to publish and subscribe on.
"""
mutable struct ShadowClient
connection::MQTTConnection
shadow_topic_prefix::Union{String,Nothing}

function ShadowClient(connection::MQTTConnection)
new(connection, nothing)
end
end

"""
on_shadow_message(
shadow_client::ShadowClient,
topic::String,
payload::String,
dup::Bool,
qos::aws_mqtt_qos,
retain::Bool,
)
A callback invoked when a shadow document message is received.
Arguments:
- `shadow_client (ShadowClient)`: Shadow client that received the message.
- `topic (String)`: Topic receiving message.
- `payload (String)`: Payload of message.
- `dup (Bool)`: DUP flag. If True, this might be re-delivery of an earlier attempt to send the message.
- `qos (aws_mqtt_qos)`: $subscribe_qos_docs
- `retain (Bool)`: Retain flag. If `true`, the message was sent as a result of a new subscription being made by the client.
Returns `nothing`.
"""
const OnShadowMessage = Function

"""
subscribe(
client::ShadowClient,
thing_name::String,
shadow_name::Union{String,Nothing},
qos::aws_mqtt_qos,
callback::OnShadowMessage,
)
Subscribes to all topics under the given shadow document using a wildcard, including but not limited to:
- `/get/accepted`
- `/get/rejected`
- `/update/delta`
- `/update/accepted`
- `/update/documents`
- `/update/rejected`
- `/delete/accepted`
- `/delete/rejected`
Arguments:
- `client (ShadowClient)`: Shadow client to use.
- `thing_name (String)`: Name of the Thing in AWS IoT.
- `shadow_name (Union{String,Nothing})`: Shadow name for a named shadow document or `nothing` for an unnamed shadow document.
- `qos (aws_mqtt_qos)`: $subscribe_qos_docs
- `callback (OnShadowMessage)`: Callback invoked when message received. See [`OnShadowMessage`](@ref) for the required signature.
Returns the tasks from each subscribe call (`/get/#`, `/update/#`, and `/delete/#`).
"""
function subscribe(
client::ShadowClient,
thing_name::String,
shadow_name::Union{String,Nothing},
qos::aws_mqtt_qos,
callback::OnShadowMessage,
)
client.shadow_topic_prefix =
shadow_name === nothing ? "\$aws/things/$thing_name/shadow" :
"\$aws/things/$thing_name/shadow/name/$shadow_name"
mqtt_callback =
(topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) ->
callback(client, topic, payload, dup, qos, retain)
getf = subscribe(client.connection, "$(client.shadow_topic_prefix)/get/#", qos, mqtt_callback)
updatef = subscribe(client.connection, "$(client.shadow_topic_prefix)/update/#", qos, mqtt_callback)
deletef = subscribe(client.connection, "$(client.shadow_topic_prefix)/delete/#", qos, mqtt_callback)
return getf, updatef, deletef
end

"""
unsubscribe(client::ShadowClient)
Unsubscribes from the shadow document topics.
Arguments:
- `client (ShadowClient)`: Shadow client to use.
$unsubscribe_return_docs
"""
function unsubscribe(client::ShadowClient)
topic = client.shadow_topic_prefix
client.shadow_topic_prefix = nothing
return unsubscribe(client.connection, "$topic/#")
end

"""
publish(client::ShadowClient, topic::String, payload::String, qos::aws_mqtt_qos)
Publishes the payload to the topic under the configured shadow topic.
Arguments:
- `client (ShadowClient)`: Shadow client to use.
- `topic (String)`: Topic name, not including the shadow topic prefix. E.g. `/get`.
- `payload (String)`: Message contents.
- `qos (aws_mqtt_qos)`: $subscribe_qos_docs
$publish_return_docs
"""
function publish(client::ShadowClient, topic::String, payload::String, qos::aws_mqtt_qos)
return publish(client.connection, "$(client.shadow_topic_prefix)$topic", payload, qos)
end
1 change: 1 addition & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[deps]
AWSCRT_jll = "01db5350-6ea1-5d9a-9a47-8a31a394cb9c"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
Loading

2 comments on commit f68bb2c

@Octogonapus
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/60859

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.1.1 -m "<description of version>" f68bb2c3a8ec2cb6e0192ac8f24513a1742cf16b
git push origin v0.1.1

Please sign in to comment.