Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uStreamer Documentation #64

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions up-l2/dispatchers/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ SPDX-License-Identifier: Apache-2.0
----


Like IP packets, uProtocol messages (CE) have a source attribute (originator of the message) and sink attribute (where should the message be sent to). These attributes are used to route the CEs from one uE to the next if the destination is not the receiving uE.
Like IP packets, uProtocol messages (UMessage) have a source attribute (originator of the message) and sink attribute (where should the message be sent to). These attributes are used to route the CEs from one uE to the next if the destination is not the receiving uE.

NOTE: Dispatcher and router shall be used For the remainder of this document we will use the term dispatcher and router interchangeably.

The header contains information for routing as well as metadata (of the data). One of the core principles of uProtocol is that the data portion of CE *MUST* be untouched by message routers, this is very similar to how most Internet standards work today. Only the source who generated the CE and the sink who will consume the CE needs to understand/analyze the payload of the CE.
The header contains information for routing as well as metadata (of the data). One of the core principles of uProtocol is that the data portion of UMessage *MUST* be untouched by message routers, this is very similar to how most Internet standards work today. Only the source who generated the UMessage and the sink who will consume the UMessage needs to understand/analyze the payload of the UMessage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be good to clarify we're talking specifically about a UPayload here? I.e. we shall not touch / read / modify UPayload.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read, but also should not be modifying the UUri and UAttributes in order to perform routing. Not sure if we have to explicitly state that in the spec. Feels like better to be clear on that point tho. WDYT?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree that we shouldn't modify source/destination UURis and existing uAttributes. We might need to add an attribute in the chain of transport.

I was going to say in accepting a SOME/IP message into the uProtocol universe there is the need to add an attribute saying whether any MAC protection checks happened and passed. But that happens at the protocol boundary, so really isn't the same thing. This might be useful between devices inside the same domain if MAC checks are included and messages not passing the MAC testing on ingestion aren't just dropped.

Is there any transport special "events" that might want us to add a hint to a message as an attribute? Perhaps messages there were queued from the previous power cycle would benefit from a hint to the destination (if they care to look for it) saying "this message was delayed in being delivered". This might trigger special handling at the sink.


To be able to forward/dispatch/route CEs through the network, we must define specific purpose built uEs to perform these tasks (ex. Ethernet switches, IP routers, etc...). Platform uEs that are responsible for event dispatching and implementing the communication layer are described in the sections below. We will elaborate on these specific uEs in the Platform uEs section below.

Expand All @@ -47,7 +47,7 @@ To be able to forward/dispatch/route CEs through the network, we must define spe
|Message bus that dispatches CEs between uEs over a common transport. It provides multicast and forwarding functionality (works like a network switch)

|*uStreamer*
|Provides Device-2-Device CE routing either using the same or different transport protocols , i.e. when events need to move form one transport to the next it flows through the streamer (can be equated to an IP router)
|Provides Device-2-Device UMessage routing either using the same or different transport protocols , i.e. when events need to move form one transport to the next it flows through the streamer (can be equated to an IP router)

|*Cloud Gateway*
|A uE that sits at the edge of the cloud to connect non-cloud devices (ex. vehicles, phones, etc...) to the cloud
Expand All @@ -64,16 +64,16 @@ In this section we will elaborate on the requirements of the platform Dispatcher

NOTE: These communication layer requirements are still for point-2-point uE communication to and from a dispatcher

* *MUST* support At-least-once delivery policy, this means that the dispatcher will make every attempt to dispatch the CE to the intended Receiver
* *MUST* support At-least-once delivery policy, this means that the dispatcher will make every attempt to dispatch the UMessage to the intended Receiver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got really hung up on this section a while ago. Maybe others reading the spec can get this but I couldn't.

Would suggest adding some verbiage like:

  • If a transport level protocol being used has an at-least-once delivery policy, this requirement is satisfied for that transport protocol.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like: This requirement can be met with a combination of uProtocol layers and the transport protocol used.

Also, while the uProtocol design and implementation MUST have design elements to do this, there are edge cases such as uProtocol stack or host device unexpected shutting down or crashing without warning that result in data loss. These are beyond this guarantee.

** *MUST* queue CEs not successfully acknowledged (transport level at-least-once delivery confirmation described above)
** *MUST* attempt to retry transmission of the CE. Retry policy is specific to the dispatcher implementation
** Dispatcher *MUST NOT* discard CEs unless either CE has expired (CE.ttl), or the egress queue is full. CEs that cannot be delivered are sent to a Dead Letter Office Topic
** *MUST* attempt to retry transmission of the UMessage. Retry policy is specific to the dispatcher implementation
** Dispatcher *MUST NOT* discard CEs unless either UMessage has expired (UMessage.ttl), or the egress queue is full. CEs that cannot be delivered are sent to a Dead Letter Office Topic

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more CEs -> UMessage


* *MAY* support additional CE delivery policies in general or per topic in the future
* *SHOULD* dispatch in order that it received the CE
* *MAY* support additional UMessage delivery policies in general or per topic in the future
* *SHOULD* dispatch in order that it received the UMessage
* *MAY* batch CEs when delivering to the Receiver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CEs => UMessages

* CEs that cannot be delivered *MUST* be sent to the Dead Letter topic (DLT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CEs => UMessages

** DLT *MUST* include at least the CE header, SHOULD contain the full CE
** DLT *MUST* include at least the UMessage header, SHOULD contain the full UMessage
** DLT *MUST* include the reason for the failed delivery attempt using error codes defined in google.rpc.Code
** uEs MUST be able to subscribe to the DLT to be notified of message deliver failures

Expand All @@ -93,10 +93,10 @@ When a dispatcher is unable to dispatch an event for a given reason (queue full,
|*google.rpc.Code* |*Reason*

|`*UNAVAILABLE*`
|The req.v1 has expired due to the downstream uE was unavailable (ex. uDevice was disconnected). uE that issued the req.v1 MAY retry with back-off
|The Request message has expired due to the downstream uE was unavailable (ex. uDevice was disconnected). uE that issued the request *MAY* retry with back-off

|`*DEADLINE_EXCEEDED*`
|CE has timed out per the ttl attribute specifications defined in req.v1 event
|UMessage has timed out per the ttl attribute specifications defined in req.v1 event

|`*PERMISSION_DENIED*`
|source is not permitted to access sink
Expand All @@ -108,7 +108,7 @@ When a dispatcher is unable to dispatch an event for a given reason (queue full,
|The dispatcher ran out of resources (buffer full)

|`*INVALID_ARGUMENT*`
|Invalid CE header attributes not covered above (ex. any mal-formatted attributes)
|Invalid UMessage header attributes not covered above (ex. any mal-formatted attributes)

|`*UNKNOWN*`
|An unknown (but not critical) error has occurred
Expand All @@ -117,8 +117,3 @@ When a dispatcher is unable to dispatch an event for a given reason (queue full,
|There is a serious error has occurred not described by error codes mentioned above
!===

<<rpc-error-flow>> figure below illustrates the sequence of messages for RPC flows and the role dispatchers play in error handling.

.RPC Error Flow
[#rpc-error-flow]
image::rpc_flow.png[RPC Error Handling]
Binary file added up-l2/dispatchers/legend.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions up-l2/dispatchers/legend.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
@startuml
'https://plantuml.com/sequence-diagram

title Legend

participant foo
participant bar


foo <-->bar: RpcClient & RpcServer APIs
foo <-[#green]->bar: <font color=green>UTransport & UListener APIs (uT)</font>
foo <-[#blue]> bar: <font color=blue>Communication Middleware</font>

@enduml
Binary file modified up-l2/dispatchers/rpc_flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
143 changes: 96 additions & 47 deletions up-l2/dispatchers/rpc_flow.puml
Original file line number Diff line number Diff line change
@@ -1,59 +1,108 @@
@startuml
'https://plantuml.com/sequence-diagram


autonumber

actor uApp #red
collections dispatchers as UB
entity uService as US #blue
title Direct RPC Flow (not through Gateway/DPR)

box uP-Foo (UAuthority="Device1") #white
actor uApp #red
boundary uPClientFoo as upc1 #black
boundary uPClientFoo as upc2 #black
entity uStreamer as uS1 #green
boundary uPClientBar as upc3 #black
end box

box uP-Bar (UAuthority="Device2") #white
boundary uPClientBar as upc4 #black
entity uService #blue
end box

uApp --\ UB: req.v1

== Setup uTransport Listeners ==
uS1 -[#green]>upc2: registerListener(\n\tUUri,\n\tUListener)
note right
<size:12>**req.v1:**</size>
"id" : "cf8b1bcd-30bd-43be-a8d3-ad1cde652e10"
"source": "up://Device1/uApp/1/rpc.response"
"sink": "up://Device1/uService/1/rpc.Method
"specversion": "1.0"
"type": "req.v1"
"dataschema": //url of Method schema//
"data": //Protobuf serialized Request Message//
UUri = "//Device2/*"
end note
upc2 -[#green]->uS1: UStatus

alt Delivery Error
UB -> UB: Create res.v1
note right
<size:12>**res.v1:**</size>
"id" : "f8c48bcf-2b54-4d64-83df-ad1cde652e10"
"source": "up://Device1/dispatcher/1/rpc.Method
"sink": "up://Device1/uApp/1/rpc.response"
"specversion": "1.0"
"type": "res.v1"
"request_id": "cf8b1bcd-30bd-43be-a8d3-ad1cde652e10"
"commstatus": /* google.rpc.Code for the error */
end note
else Delivery Success
UB --\ US: req.v1
US -> US: Process req.v1
note right
<size:12>**res.v1:**</size>
"id" : "f8c48bcf-2b54-4d64-83df-ad1cde652e10"
"source": "up://Device1/dispatcher/1/rpc.Method
"sink": "up://Device1/uApp/1/rpc.response"
"specversion": "1.0"
"type": "res.v1"
"request_id": "cf8b1bcd-30bd-43be-a8d3-ad1cde652e10"
"dataschema": //url of Method Response Message//
"data": // Protobuf serialized Response Message//
end note
US --/ UB: res.v1
end alt
UB --/ uApp: res.v1
uApp -> uApp: Process res.v1
uS1 -[#green]>upc3: registerListener(\n\tUUri,\n\tUListener)
note right
<size:12>**Process res.v1:**</size>
if (commstatus)
// Process L2 Error//
else
//handle returned L3 message//
UUri = "//Device1/*"
end note
upc3 -[#green]->uS1: UStatus
uService -> upc4: registerRpcListener(\n\tUUri,\n\tURpcListener)
note left
UUri = "/uService/rpc.myMethod"
end note
upc4 --> uService: UStatus

== Request ==
uApp -> upc1 : invokeMethod(\n\tUUri, \n\tUPayload, \n\tCallOptions)
note right
<size:12>**UUri:**</size>
authority:{name: "Device2", ip: "192.168.1.100"}
entity: {name: "uService", version_major: 1, id: 99}
resource: {name: "rpc", instance: "myMethod", id: 1}

end note
upc1 -> upc1: Build request
note right
<size:12>**Request UMessage:**</size>
source: {
\tauthority:{name: "Device1", ip: "192.168.1.101"}
\tentity: {name: "uApp", version_major: 1, id: 300}
\tresource: {name: "rpc", instance: "response", id: 0}
}
payload: { /* Request Payload */ }
attributes: {
\tid: /* UUID */,
\ttype: UMESSAGE_TYPE_REQUEST
\tpriority: UPRIORITY_CS4
\tttl: 10000
\tsink: {
\t\tauthority:{name: "Device2", ip: "192.168.1.100"}
\t\tentity: {name: "uService", version_major: 1, id: 99}
\t\tresource: {name: "rpc", instance: "myMethod", id: 1}
\t}
}
end note
upc1 -[#green]> upc1:send(request)
upc1 <-[#blue]> upc2: Foo
upc1 --> uApp: Future

upc2 -[#green]> uS1: onReceive(request)
uS1 -> uS1: Queuing & routing
uS1 -[#green]> upc3:send(request)
upc3 <-[#blue]> upc4: Bar

upc4 -[#green]> upc4: onReceive(request)
upc4 ->uService: onReceive(request)


== Response ==
uService -> uService: Process req,\nbuild res
note left
<size:12>**Response UMessage:**</size>
source: //Device2/uService/rpc.myMethod"" }
""payload: { /* Request Payload */ }
""attributes: {
\tid: /* UUID */,
\ttype: UMESSAGE_TYPE_RESPONSE
\tpriority: UPRIORITY_CS4
\tttl: 10000
\tsink: //Device1/uApp/1/rpc.response
\treqid: /* Request attributes.id */
}
end note

uService -[#green]> upc4:send(response)
upc4 <-[#blue]> upc3: Bar
upc3 -[#green]> uS1: onReceive(response)
uS1 -[#green]>upc2:send(response)
upc2 <-[#blue]> upc1: Foo
upc2 -[#green]->uS1: UStatus
upc1 --\uApp: Future<response>\n\t.complete()

@enduml
72 changes: 72 additions & 0 deletions up-l2/dispatchers/streamer.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
= uStreamer
:toc:
:sectnums:

The key words "*MUST*", "*MUST NOT*", "*REQUIRED*", "*SHALL*", "*SHALL NOT*", "*SHOULD*", "*SHOULD NOT*", "*RECOMMENDED*", "*MAY*", and "*OPTIONAL*" in this document are to be interpreted as described in https://www.rfc-editor.org/info/bcp14[IETF BCP14 (RFC2119 & RFC8174)]

----
Copyright (c) 2023 General Motors GTO LLC

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

SPDX-FileType: DOCUMENTATION
SPDX-FileCopyrightText: 2023 General Motors GTO LLC
SPDX-License-Identifier: Apache-2.0
----

== Overview

The streamer is a uProtocol dispatcher responsible to route messages to/from the local link:../../up-l1/README.adoc[uTransport] to one or more remote uTransports. Local uTransport refers to the transport implementation that the streamer is running on (the transport for the device that the streamer is deployed to). Remote UTransports are any non-local transports that messages need to be forwarded to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think something got lost here?

Local uTransport refers to the transport implementation that the streamer is running on (the transport for the device that the streamer is deployed to).

Maybe:

Local uTransport refers to the transport implementation that the streamer is using to communicate messages to uEs within the device.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the proposed alternate text. "running on" can mean the OS and/or the actual IPC transport.


NOTE: The term _local transport_ could also be commonly referred as _downstream transport_ vs _remote transport_ referred as _upstream transport_, we use the term local vs remote as they are also commonly used for our addressing scheme in uProtocol.

* locally addressed messages *MUST NOT* be routed to remote transports

Streamer use the link:../../up-l1/README.adoc[uTransport APIs] to configure the flow of messages between the local and remote transports.


== Routing Policies
Streamer dispatches messages by declaring simple routing policies to either route or forward messages to/from a given transport to another using the UTransport API.

The Transport boundaries match up with UUri link:../../basics/uri.adoc[UAuthority] to make the declaration of the policies simpler.

NOTE: UDevices that have connected through the same transport do not require defining routing policies as the underlining transport takes care of the dispatching of messages.

=== Routing Examples

Below is an example of a simple routing configuration to connect a single local transport with a remote transport. for the purpose of this example, the local device will have UAuthority `uLocal` and the remote device will be have the UAuthority `URemote`.

```
yes one sec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is the @stevenhartley equivalent of a TODO 🙂, otherwise I'm unsure of what this is.

```


.Legend
image::legend.png[#legend]

=== Building uStreamers using uTransport APIs
<<streamer-flow>> diagram below illustrates how the uTransport APIs are used to build a uStreamer. The uTransport APIs are used to send/receive UMessages over the underlining communication middleware. The uStreamer is responsible for routing the UMessages to the appropriate uE based on the UMessage header attributes.

.Streamer Flows using uTransport
image:streamer_flow.png[#streamer_flow]

=== RPC Flows
<<rpc-flow>> diagram below illustrates how the uPClient RpcClient & RpcServer interfaces are then connected to the uTransport such that a transport can send/receive the UMessages over the underlining communication middleware
.Rpc Flows
image:rpc_flow.png[#rpc_flow]

49 changes: 49 additions & 0 deletions up-l2/dispatchers/streamer.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added up-l2/dispatchers/streamer_flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading