-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathCollectionInspectionWorkerThread.h
127 lines (106 loc) · 4.43 KB
/
CollectionInspectionWorkerThread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#pragma once
#include "Clock.h"
#include "ClockHandler.h"
#include "CollectionInspectionAPITypes.h"
#include "CollectionInspectionEngine.h"
#include "DataSenderTypes.h"
#include "RawDataManager.h"
#include "Signal.h"
#include "Thread.h"
#include "TimeTypes.h"
#include <atomic>
#include <cstdint>
#include <memory>
#include <mutex>
#ifdef FWE_FEATURE_STORE_AND_FORWARD
#include "StreamForwarder.h"
#include "StreamManager.h"
#endif
namespace Aws
{
namespace IoTFleetWise
{
class CollectionInspectionWorkerThread
{
public:
CollectionInspectionWorkerThread( CollectionInspectionEngine &collectionInspectionEngine )
: mCollectionInspectionEngine( collectionInspectionEngine ){};
~CollectionInspectionWorkerThread();
CollectionInspectionWorkerThread( const CollectionInspectionWorkerThread & ) = delete;
CollectionInspectionWorkerThread &operator=( const CollectionInspectionWorkerThread & ) = delete;
CollectionInspectionWorkerThread( CollectionInspectionWorkerThread && ) = delete;
CollectionInspectionWorkerThread &operator=( CollectionInspectionWorkerThread && ) = delete;
void onChangeInspectionMatrix( const std::shared_ptr<const InspectionMatrix> &inspectionMatrix );
/**
* @brief As soon as new data is available in any input queue call this to wakeup the thread
* */
void onNewDataAvailable();
/**
* @brief Initialize the component by handing over all queues
*
* @return true if initialization was successful
* */
bool init( const std::shared_ptr<SignalBuffer> &inputSignalBuffer, /**< IVehicleDataSourceConsumer instances will
put relevant signals in this queue */
const std::shared_ptr<DataSenderQueue>
&outputCollectedData, /**< this thread will put data that should be sent to cloud into this queue */
uint32_t idleTimeMs, /**< if no new data is available sleep for this amount of milliseconds */
std::shared_ptr<RawData::BufferManager> rawBufferManager =
nullptr /**< the raw buffer manager which is informed what data is used */
#ifdef FWE_FEATURE_STORE_AND_FORWARD
,
std::shared_ptr<Aws::IoTFleetWise::Store::StreamForwarder> streamForwarder = nullptr,
std::shared_ptr<Aws::IoTFleetWise::Store::StreamManager> streamManager = nullptr
#endif
);
/**
* @brief stops the internal thread if started and wait until it finishes
*
* @return true if the stop was successful
*/
bool stop();
/**
* @brief starts the internal thread
*
* @return true if the start was successful
*/
bool start();
/**
* @brief Checks that the worker thread is healthy and consuming data.
*/
bool isAlive();
private:
static constexpr Timestamp EVALUATE_INTERVAL_MS = 1; // Evaluate every millisecond
static constexpr uint32_t DEFAULT_THREAD_IDLE_TIME_MS = 1000;
// Stop the thread
// Intercepts stop signals.
bool shouldStop() const;
static void doWork( void *data );
static TimePoint calculateMonotonicTime( const TimePoint &currTime, Timestamp systemTimeMs );
/**
* @brief Collects data ready for the upload from collection inspection engine and passes it to the data sender
* @return number of collected data packages successfully pushed to the upload queue
*/
uint32_t collectDataAndUpload();
CollectionInspectionEngine &mCollectionInspectionEngine;
std::shared_ptr<SignalBuffer> mInputSignalBuffer;
std::shared_ptr<DataSenderQueue> mOutputCollectedData;
Thread mThread;
std::atomic<bool> mShouldStop{ false };
std::atomic<bool> mUpdatedInspectionMatrixAvailable{ false };
std::shared_ptr<const InspectionMatrix> mUpdatedInspectionMatrix;
std::mutex mInspectionMatrixMutex;
std::mutex mThreadMutex;
Signal mWait;
uint32_t mIdleTimeMs{ DEFAULT_THREAD_IDLE_TIME_MS };
std::shared_ptr<const Clock> mClock = ClockHandler::getClock();
std::shared_ptr<RawData::BufferManager> mRawBufferManager{ nullptr };
#ifdef FWE_FEATURE_STORE_AND_FORWARD
std::shared_ptr<Aws::IoTFleetWise::Store::StreamForwarder> mStreamForwarder;
std::shared_ptr<Aws::IoTFleetWise::Store::StreamManager> mStreamManager;
#endif
};
} // namespace IoTFleetWise
} // namespace Aws