Skip to content

Commit 94543aa

Browse files
committed
Add example blob data source for VSD
1 parent eb475c9 commit 94543aa

File tree

8 files changed

+259
-0
lines changed

8 files changed

+259
-0
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ if(FWE_FEATURE_VISION_SYSTEM_DATA)
301301
src/DataSenderIonWriter.cpp
302302
src/RawDataManager.cpp
303303
src/S3Sender.cpp
304+
src/MyBlobDataSource.cpp
304305
)
305306
set(TEST_FILES ${TEST_FILES}
306307
test/unit/CredentialsTest.cpp
@@ -314,6 +315,7 @@ if(FWE_FEATURE_VISION_SYSTEM_DATA)
314315
src/RawDataManager.h
315316
src/S3Sender.h
316317
src/TransferManagerWrapper.h
318+
src/MyBlobDataSource.h
317319
)
318320
endif()
319321
if(FWE_FEATURE_ROS2)

src/IoTFleetWiseEngine.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,15 @@ IoTFleetWiseEngine::connect( const Json::Value &jsonConfig )
684684
}
685685
}
686686

687+
#ifdef FWE_FEATURE_VISION_SYSTEM_DATA
688+
mMyBlobDataSource = std::make_unique<MyBlobDataSource>( signalBufferPtr, rawDataBufferManager );
689+
mCollectionSchemeManagerPtr->subscribeToActiveDecoderDictionaryChange(
690+
std::bind( &MyBlobDataSource::onChangeOfActiveDictionary,
691+
mMyBlobDataSource.get(),
692+
std::placeholders::_1,
693+
std::placeholders::_2 ) );
694+
#endif
695+
687696
/********************************Data source bootstrap end*******************************/
688697

689698
// Only start the CollectionSchemeManager after all listeners have subscribed, otherwise

src/IoTFleetWiseEngine.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "IWaveGpsSource.h"
4747
#endif
4848
#ifdef FWE_FEATURE_VISION_SYSTEM_DATA
49+
#include "MyBlobDataSource.h"
4950
#include "S3Sender.h"
5051
#include <aws/core/auth/AWSCredentialsProvider.h>
5152
#include <aws/core/utils/threading/Executor.h>
@@ -194,6 +195,7 @@ class IoTFleetWiseEngine
194195
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> mAwsCredentialsProvider;
195196
std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor> mTransferManagerExecutor;
196197
std::shared_ptr<S3Sender> mS3Sender;
198+
std::unique_ptr<MyBlobDataSource> mMyBlobDataSource;
197199
#endif
198200
#ifdef FWE_FEATURE_IWAVE_GPS
199201
std::shared_ptr<IWaveGpsSource> mIWaveGpsSource;

src/MyBlobDataSource.cpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#include "MyBlobDataSource.h"
5+
#include "LoggingModule.h"
6+
#include <chrono>
7+
#include <utility>
8+
#include <vector>
9+
10+
namespace Aws
11+
{
12+
namespace IoTFleetWise
13+
{
14+
15+
MyBlobDataSource::MyBlobDataSource( SignalBufferPtr signalBufferPtr,
16+
std::shared_ptr<RawData::BufferManager> rawDataBufferManager )
17+
: mSignalBufferPtr( std::move( signalBufferPtr ) )
18+
, mRawBufferManager( std::move( rawDataBufferManager ) )
19+
{
20+
mThread = std::thread( [this]() {
21+
// Example code that pushes a message every 500ms:
22+
while ( !mStop )
23+
{
24+
pushData();
25+
std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
26+
}
27+
} );
28+
}
29+
30+
MyBlobDataSource::~MyBlobDataSource()
31+
{
32+
mStop = true;
33+
mThread.join();
34+
}
35+
36+
void
37+
MyBlobDataSource::pushData()
38+
{
39+
std::lock_guard<std::mutex> lock( mDecoderDictionaryMutex );
40+
if ( mBlobSourceSignalId == INVALID_SIGNAL_ID )
41+
{
42+
FWE_LOG_TRACE( "No decoding info yet" );
43+
return;
44+
}
45+
46+
FWE_LOG_TRACE( "Pushing blob" );
47+
// Example blob data:
48+
std::vector<uint8_t> blob{ 'H', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!' };
49+
50+
// Serialize in the CDR format defined in blob-nodes.json and blob-decoders.json which is just a
51+
// message containing a single byte array of unstructured (blob) data:
52+
// Add CDR header:
53+
std::vector<uint8_t> cdr{
54+
0, // UINT8 Dummy byte
55+
1, // UINT8 Encapsulation
56+
0, // UINT16-LSB Options
57+
0, // UINT16-MSB Options
58+
};
59+
// Add the UINT32 blob size:
60+
cdr.push_back( blob.size() & 0xFF );
61+
cdr.push_back( ( blob.size() >> 8 ) & 0xFF );
62+
cdr.push_back( ( blob.size() >> 16 ) & 0xFF );
63+
cdr.push_back( ( blob.size() >> 24 ) & 0xFF );
64+
// Add blob data:
65+
cdr.insert( cdr.end(), blob.begin(), blob.end() );
66+
67+
// Ingest the message:
68+
auto timestamp = mClock->systemTimeSinceEpochMs();
69+
auto bufferHandle = mRawBufferManager->push( cdr.data(), cdr.size(), timestamp, mBlobSourceSignalId );
70+
if ( bufferHandle == RawData::INVALID_BUFFER_HANDLE )
71+
{
72+
FWE_LOG_WARN( "Raw message was rejected by RawBufferManager" );
73+
return;
74+
}
75+
// immediately set usage hint so buffer handle does not get directly deleted again
76+
mRawBufferManager->increaseHandleUsageHint(
77+
mBlobSourceSignalId, bufferHandle, RawData::BufferHandleUsageStage::COLLECTED_NOT_IN_HISTORY_BUFFER );
78+
auto collectedSignal =
79+
CollectedSignal( mBlobSourceSignalId, timestamp, bufferHandle, SignalType::RAW_DATA_BUFFER_HANDLE );
80+
CollectedSignalsGroup collectedSignalsGroup;
81+
collectedSignalsGroup.push_back( collectedSignal );
82+
if ( !mSignalBufferPtr->push( CollectedDataFrame( collectedSignalsGroup ) ) )
83+
{
84+
FWE_LOG_WARN( "Signal buffer full" );
85+
}
86+
}
87+
88+
void
89+
MyBlobDataSource::onChangeOfActiveDictionary( ConstDecoderDictionaryConstPtr &dictionary,
90+
VehicleDataSourceProtocol networkProtocol )
91+
{
92+
if ( networkProtocol != VehicleDataSourceProtocol::COMPLEX_DATA )
93+
{
94+
return;
95+
}
96+
std::lock_guard<std::mutex> lock( mDecoderDictionaryMutex );
97+
mBlobSourceSignalId = INVALID_SIGNAL_ID;
98+
auto decoderDictionaryPtr = std::dynamic_pointer_cast<const ComplexDataDecoderDictionary>( dictionary );
99+
auto decoders = decoderDictionaryPtr->complexMessageDecoderMethod.find( BLOB_NETWORK_INTERFACE_ID );
100+
if ( decoders == decoderDictionaryPtr->complexMessageDecoderMethod.end() )
101+
{
102+
FWE_LOG_INFO( std::string( "No decoders found for interface ID " ) + BLOB_NETWORK_INTERFACE_ID );
103+
return;
104+
}
105+
auto decoder = decoders->second.find( BLOB_MESSAGE_ID );
106+
if ( decoder == decoders->second.end() )
107+
{
108+
FWE_LOG_INFO( std::string( "No decoder found for message ID " ) + BLOB_MESSAGE_ID );
109+
return;
110+
}
111+
mBlobSourceSignalId = decoder->second.mSignalId;
112+
FWE_LOG_INFO( "Signal ID for blob is " + std::to_string( mBlobSourceSignalId ) );
113+
114+
// Note that there's no sanity check of the message format here, so if it doesn't match the
115+
// format pushed in pushData(), then cloud won't understand it.
116+
}
117+
118+
} // namespace IoTFleetWise
119+
} // namespace Aws

src/MyBlobDataSource.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#pragma once
5+
6+
#include "Clock.h"
7+
#include "ClockHandler.h"
8+
#include "CollectionInspectionAPITypes.h"
9+
#include "IDecoderDictionary.h"
10+
#include "RawDataManager.h"
11+
#include "VehicleDataSourceTypes.h"
12+
#include <atomic>
13+
#include <memory>
14+
#include <mutex>
15+
#include <string>
16+
#include <thread>
17+
18+
namespace Aws
19+
{
20+
namespace IoTFleetWise
21+
{
22+
23+
class MyBlobDataSource
24+
{
25+
public:
26+
MyBlobDataSource( SignalBufferPtr signalBufferPtr, std::shared_ptr<RawData::BufferManager> rawDataBufferManager );
27+
~MyBlobDataSource();
28+
29+
MyBlobDataSource( const MyBlobDataSource & ) = delete;
30+
MyBlobDataSource &operator=( const MyBlobDataSource & ) = delete;
31+
MyBlobDataSource( MyBlobDataSource && ) = delete;
32+
MyBlobDataSource &operator=( MyBlobDataSource && ) = delete;
33+
34+
void onChangeOfActiveDictionary( ConstDecoderDictionaryConstPtr &dictionary,
35+
VehicleDataSourceProtocol networkProtocol );
36+
37+
private:
38+
// Note these must match the interface and message IDs sent in the decoder manifest from the cloud:
39+
static constexpr const char *BLOB_NETWORK_INTERFACE_ID = "MyBlobNetworkInterfaceId";
40+
static constexpr const char *BLOB_MESSAGE_ID = "MyBlobMessageId";
41+
42+
std::mutex mDecoderDictionaryMutex;
43+
SignalID mBlobSourceSignalId{ INVALID_SIGNAL_ID };
44+
SignalBufferPtr mSignalBufferPtr;
45+
std::shared_ptr<RawData::BufferManager> mRawBufferManager;
46+
std::thread mThread;
47+
std::atomic_bool mStop{ false };
48+
std::shared_ptr<const Clock> mClock = ClockHandler::getClock();
49+
50+
void pushData();
51+
};
52+
53+
} // namespace IoTFleetWise
54+
} // namespace Aws

tools/cloud/blob-decoders.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[
2+
{
3+
"fullyQualifiedName": "Vehicle.MyBlobSource",
4+
"type": "MESSAGE_SIGNAL",
5+
"interfaceId": "MyBlobNetworkInterfaceId",
6+
"messageSignal": {
7+
"topicName": "MyBlobMessageId",
8+
"structuredMessage": {
9+
"structuredMessageDefinition": [
10+
{
11+
"fieldName": "data",
12+
"dataType": {
13+
"structuredMessageListDefinition": {
14+
"name": "listType",
15+
"memberType": {
16+
"primitiveMessageDefinition": {
17+
"ros2PrimitiveMessageDefinition": {
18+
"primitiveType": "UINT8"
19+
}
20+
}
21+
},
22+
"capacity": 0,
23+
"listType": "DYNAMIC_UNBOUNDED_CAPACITY"
24+
}
25+
}
26+
}
27+
]
28+
}
29+
}
30+
}
31+
]

tools/cloud/blob-nodes.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[
2+
{
3+
"branch": {
4+
"fullyQualifiedName": "Types"
5+
}
6+
},
7+
{
8+
"struct": {
9+
"fullyQualifiedName": "Types.MyBlobType"
10+
}
11+
},
12+
{
13+
"property": {
14+
"fullyQualifiedName": "Types.MyBlobType.data",
15+
"dataType": "UINT8_ARRAY",
16+
"dataEncoding": "BINARY"
17+
}
18+
},
19+
{
20+
"branch": {
21+
"fullyQualifiedName": "Vehicle",
22+
"description": "Vehicle"
23+
}
24+
},
25+
{
26+
"sensor": {
27+
"fullyQualifiedName": "Vehicle.MyBlobSource",
28+
"dataType": "STRUCT",
29+
"structFullyQualifiedName": "Types.MyBlobType"
30+
}
31+
}
32+
]
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[
2+
{
3+
"interfaceId": "MyBlobNetworkInterfaceId",
4+
"type": "VEHICLE_MIDDLEWARE",
5+
"vehicleMiddleware": {
6+
"name": "blob",
7+
"protocolName": "ROS_2"
8+
}
9+
}
10+
]

0 commit comments

Comments
 (0)