Skip to content

Commit

Permalink
V3 snaps error handling (#1188)
Browse files Browse the repository at this point in the history
  • Loading branch information
Serafadam authored Dec 19, 2024
1 parent 353dda8 commit bee44fa
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 47 deletions.
14 changes: 6 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ option(DEPTHAI_NEW_FIND_PYTHON "Use new FindPython module" ON)
if(NOT DEPTHAI_OPENCV_SUPPORT)
set(DEPTHAI_MERGED_TARGET OFF CACHE BOOL "Enable merged target build" FORCE)
endif()
option(DEPTHAI_ENABLE_EVENTS_MANAGER "Enable Events Manager" ON)

set(DEPTHAI_HAS_APRIL_TAG ${DEPTHAI_ENABLE_APRIL_TAG})
if(WIN32)
Expand Down Expand Up @@ -108,18 +107,17 @@ endif()

if(DEPTHAI_ENABLE_PROTOBUF)
option(DEPTHAI_ENABLE_REMOTE_CONNECTION "Enable Remote Connection support" ON)
if(DEPTHAI_ENABLE_CURL)
option(DEPTHAI_ENABLE_EVENTS_MANAGER "Enable Events Manager" ON)
else()
message(STATUS "Events Manager disabled because Protobuf & curl support is disabled.")
option(DEPTHAI_ENABLE_EVENTS_MANAGER "Enable Events Manager" OFF)
endif()
else()
option(DEPTHAI_ENABLE_REMOTE_CONNECTION "Enable Remote Connection support" OFF)
message(STATUS "Remote Connection support disabled because Protobuf support is disabled.")
endif()

if(DEPTHAI_ENABLE_EVENTS_MANAGER)
if(NOT DEPTHAI_ENABLE_PROTOBUF OR NOT DEPTHAI_CURL_SUPPORT)
message(STATUS "Events Manager disabled because Protobuf & curl support is disabled.")
set(DEPTHAI_ENABLE_EVENTS_MANAGER OFF)
endif()
endif()

if(DEPTHAI_BUILD_PYTHON)
list(APPEND VCPKG_MANIFEST_FEATURES "python-bindings")
endif()
Expand Down
2 changes: 1 addition & 1 deletion examples/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ dai_add_example(model_zoo RVC2/ModelZoo/model_zoo.cpp OFF OFF)

# Events Manager
if(DEPTHAI_ENABLE_EVENTS_MANAGER)
dai_add_example(events HostNodes/events.cpp ON OFF)
dai_add_example(events Events/events.cpp ON OFF)
endif()
# Image Align
dai_add_example(image_align RVC2/ImageAlign/image_align.cpp OFF OFF)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <chrono>
#include <iostream>
#include <string>
Expand Down Expand Up @@ -27,20 +26,6 @@ int main(int argc, char* argv[]) {
std::vector<std::shared_ptr<dai::utility::EventData>> data;
data.emplace_back(fileData);
eventsManager->sendEvent("testdata", nullptr, data, {"tag3", "tag4"}, {{"key8", "value8"}});
auto fileData2 = std::make_shared<dai::utility::EventData>("/test.txt");
std::vector<std::shared_ptr<dai::utility::EventData>> data2;
data2.push_back(fileData2);
// will fail, you sendEvent instead of sendSnap
eventsManager->sendSnap("testdata2", nullptr, data2, {"tag5", "tag6"}, {{"key8", "value8"}});
auto fileData3 = std::make_shared<dai::utility::EventData>("/test.jpg");
std::vector<std::shared_ptr<dai::utility::EventData>> data3;
data3.push_back(fileData3);
eventsManager->sendSnap("testdata3", nullptr, data3, {"tag7", "tag8"}, {{"key8", "value8"}});
std::vector<std::shared_ptr<dai::utility::EventData>> data4;
data4.push_back(fileData);
data4.push_back(fileData2);
eventsManager->sendEvent("testdata4", nullptr, data4, {"tag9", "tag10"}, {{"key8", "value8"}});
data4.push_back(fileData3);
while(pipeline.isRunning()) {
auto rgb = previewQ->get<dai::ImgFrame>();

Expand All @@ -49,10 +34,6 @@ int main(int argc, char* argv[]) {

if(!sent) {
eventsManager->sendSnap("rgb", rgb, {}, {"tag11", "tag12"}, {{"key", "value"}});
// will fail due to two images being sent, use sendEvent instead
eventsManager->sendSnap("test2", rgb, data3, {"tag13", "tag14"}, {{"key8", "value8"}});
// will fail, sendSnap requires only one image data to be present
eventsManager->sendSnap("test3", rgb, data4, {"tag13", "tag14"}, {{"key8", "value8"}});
sent = true;
}
//
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#!/usr/bin/env python3

import cv2
Expand All @@ -10,7 +9,7 @@
# Create pipeline
with dai.Pipeline() as pipeline:
# Define sources and outputs
camRgb = pipeline.create(dai.node.Camera)
camRgb = pipeline.create(dai.node.Camera).build()
# Properties

qRgb = camRgb.requestOutput((256,256)).createOutputQueue()
Expand All @@ -22,10 +21,6 @@
time.sleep(2)
fileData = dai.EventData(b'Hello, world!', "hello.txt", "text/plain")
eventMan.sendEvent("test2", None, [fileData], ["tag1", "tag2"], {"key1": "value1"})
fileData2 = dai.EventData("/test.txt")
# will fail, sendSnap needs an image
eventMan.sendSnap("test3", None, [fileData2], ["tag1", "tag2"], {"key1": "value1"})
eventMan.sendEvent("test4", None, [fileData, fileData2], ["tag1", "tag2"], {"key1": "value1"})
pipeline.start()

frame = None
Expand All @@ -39,8 +34,6 @@
frame = inRgb.getCvFrame()
if not eventSent:
eventMan.sendSnap("rgb", inRgb, [], ["tag1", "tag2"], {"key1": "value1"})
# will fail, sendSnap requires only image and no extra data
eventMan.sendSnap("rgb2", inRgb, [fileData2], ["tag1", "tag2"], {"key1": "value1"})
eventSent = True

if frame is not None:
Expand Down
6 changes: 4 additions & 2 deletions include/depthai/utility/EventsManager.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
Expand All @@ -19,7 +20,6 @@ class Event;
} // namespace event
} // namespace proto
namespace utility {
#ifdef DEPTHAI_ENABLE_PROTOBUF
enum class EventDataType { DATA, FILE_URL, IMG_FRAME, ENCODED_FRAME, NN_DATA };
class EventData {
public:
Expand Down Expand Up @@ -172,7 +172,9 @@ class EventsManager {
std::string cacheDir;
bool uploadCachedOnStart;
bool cacheIfCannotSend;
std::atomic<bool> stopEventBuffer;
std::condition_variable eventBufferCondition;
std::mutex eventBufferConditionMutex;
};
#endif
} // namespace utility
} // namespace dai
54 changes: 45 additions & 9 deletions src/utility/EventsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include "Environment.hpp"
#include "Logging.hpp"
#include "cpr/cpr.h"
#include "depthai/schemas/Event.pb.h"
namespace dai {

namespace utility {
Expand Down Expand Up @@ -101,18 +103,23 @@ EventsManager::EventsManager(std::string url, bool uploadCachedOnStart, float pu
queueSize(10),
publishInterval(publishInterval),
logResponse(false),
verifySsl(false),
verifySsl(true),
connected(false),
cacheDir("/internal/private"),
uploadCachedOnStart(uploadCachedOnStart),
cacheIfCannotSend(false) {
cacheIfCannotSend(false),
stopEventBuffer(false) {
sourceAppId = utility::getEnv("AGENT_APP_ID");
sourceAppIdentifier = utility::getEnv("AGENT_APP_IDENTIFIER");
token = utility::getEnv("DEPTHAI_HUB_API_KEY");
if(token.empty()) {
throw std::runtime_error("Missing token, please set DEPTHAI_HUB_API_KEY environment variable or use setToken method");
}
eventBufferThread = std::make_unique<std::thread>([this]() {
while(true) {
while(!stopEventBuffer) {
sendEventBuffer();
std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(this->publishInterval * 1000)));
std::unique_lock<std::mutex> lock(eventBufferMutex);
eventBufferCondition.wait_for(lock, std::chrono::seconds(static_cast<int>(this->publishInterval)));
}
});
checkConnection();
Expand All @@ -122,7 +129,14 @@ EventsManager::EventsManager(std::string url, bool uploadCachedOnStart, float pu
}

EventsManager::~EventsManager() {
eventBufferThread->join();
stopEventBuffer = true;
{
std::unique_lock<std::mutex> lock(eventBufferMutex);
eventBufferCondition.notify_one();
}
if(eventBufferThread->joinable()) {
eventBufferThread->join();
}
}

void EventsManager::sendEventBuffer() {
Expand All @@ -138,16 +152,26 @@ void EventsManager::sendEventBuffer() {
}
return;
}
// Create request
cpr::Url url = static_cast<cpr::Url>(this->url + "/v1/events");
for(auto& eventM : eventBuffer) {
auto& event = eventM->event;
batchEvent->add_events()->Swap(event.get());
}
}
std::string serializedEvent;
batchEvent->SerializeToString(&serializedEvent);
cpr::Response r = cpr::Post(cpr::Url{url}, cpr::Body{serializedEvent}, cpr::Header{{"Authorization", "Bearer " + token}}, cpr::VerifySsl(verifySsl));
cpr::Url reqUrl = static_cast<cpr::Url>(this->url + "/v1/events");
cpr::Response r = cpr::Post(
cpr::Url{reqUrl},
cpr::Body{serializedEvent},
cpr::Header{{"Authorization", "Bearer " + token}},
cpr::VerifySsl(verifySsl),
cpr::ProgressCallback(
[&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool {
if(stopEventBuffer) {
return false;
}
return true;
}));
if(r.status_code != cpr::status::HTTP_OK) {
logger::error("Failed to send event: {} {}", r.text, r.status_code);
} else {
Expand Down Expand Up @@ -272,7 +296,19 @@ void EventsManager::sendFile(const std::shared_ptr<EventData>& file, const std::
}};
header["File-Size"] = std::to_string(std::filesystem::file_size(file->data));
}
cpr::Response r = cpr::Post(cpr::Url{url}, cpr::Multipart{fileM}, cpr::Header{header}, cpr::VerifySsl(verifySsl));
cpr::Response r = cpr::Post(
cpr::Url{url},
cpr::Multipart{fileM},
cpr::Header{header},
cpr::VerifySsl(verifySsl),

cpr::ProgressCallback(
[&](cpr::cpr_off_t downloadTotal, cpr::cpr_off_t downloadNow, cpr::cpr_off_t uploadTotal, cpr::cpr_off_t uploadNow, intptr_t userdata) -> bool {
if(stopEventBuffer) {
return false;
}
return true;
}));
if(r.status_code != cpr::status::HTTP_OK) {
logger::error("Failed to upload file: {} error code {}", r.text, r.status_code);
}
Expand Down

0 comments on commit bee44fa

Please sign in to comment.