Skip to content

Commit 8c14d32

Browse files
authored
Feature/improve monitor (#40)
- add gitversion configuration - new async monitor design +semver: patch
1 parent f70f750 commit 8c14d32

11 files changed

+231
-128
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ set(COMMON_SOURCE_FILES
6666
src/k2eg/service/epics/EpicsChannel.cpp
6767
src/k2eg/service/epics/EpicsPutOperation.cpp
6868
src/k2eg/service/epics/EpicsGetOperation.cpp
69+
src/k2eg/service/epics/EpicsMonitorOperation.cpp
6970
src/k2eg/service/epics/EpicsServiceManager.cpp
7071
src/k2eg/service/epics/JsonSerialization.cpp
7172
src/k2eg/service/epics/MsgPackSerialization.cpp

GitVersion.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mode: mainline
2+
tag-prefix: '[vV]'
3+
commit-message-incrementing: Enabled
4+
major-version-bump-message: '\+semver:\s?(breaking|major)'
5+
minor-version-bump-message: '\+semver:\s?(feature|minor)'
6+
patch-version-bump-message: '\+semver:\s?(fix|patch)'
7+
no-bump-message: '\+semver:\s?(none|skip)'
8+
assembly-informational-format: '{Major}.{Minor}.{Patch}'
Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
#include <k2eg/service/epics/EpicsChannel.h>
2+
23
#include <pv/caProvider.h>
34
#include <pv/clientFactory.h>
45

56
#include <memory>
67

7-
#include "k2eg/service/epics/EpicsGetOperation.h"
8-
#include "k2eg/service/epics/EpicsPutOperation.h"
9-
108
using namespace k2eg::service::epics_impl;
119

12-
namespace pvd = epics::pvData;
1310
namespace pva = epics::pvAccess;
1411

1512
EpicsChannel::EpicsChannel(pvac::ClientProvider& provider, const std::string& pv_name, const std::string& address) : pv_name(pv_name), address(address) {
@@ -18,10 +15,7 @@ EpicsChannel::EpicsChannel(pvac::ClientProvider& provider, const std::string& pv
1815
channel = std::make_shared<pvac::ClientChannel>(provider.connect(pv_name, opt));
1916
}
2017

21-
EpicsChannel::~EpicsChannel() {
22-
// if (channel) { channel->reset(); }
23-
// if (provider) { provider->disconnect(); }
24-
}
18+
EpicsChannel::~EpicsChannel() {}
2519

2620
void
2721
EpicsChannel::init() {
@@ -32,8 +26,6 @@ EpicsChannel::init() {
3226

3327
void
3428
EpicsChannel::deinit() {
35-
// "pva" provider automatically in registry
36-
// add "ca" provider to registry
3729
pva::ca::CAClientFactory::stop();
3830
}
3931

@@ -47,53 +39,11 @@ EpicsChannel::get(const std::string& field, const std::string& additional_filed)
4739
if (additional_filed.empty())
4840
return MakeSingleGetOperationUPtr(channel, pv_name, field);
4941
else
50-
return MakeCombinedGetOperationUPtr(
51-
MakeSingleGetOperationShrdPtr(channel, pv_name, field),
52-
MakeSingleGetOperationShrdPtr(channel, pv_name, additional_filed)
53-
);
54-
}
55-
56-
void
57-
EpicsChannel::startMonitor(const std::string& field) {
58-
mon = channel->monitor(pvd::createRequest(field));
59-
}
60-
61-
EventReceivedShrdPtr
62-
EpicsChannel::monitor() {
63-
EventReceivedShrdPtr result = std::make_shared<EventReceived>();
64-
if (!mon.wait(0.100)) {
65-
// updates mon.event
66-
result->event_timeout->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Timeout, pv_name, "Time out", nullptr}));
67-
return result;
68-
}
69-
70-
switch (mon.event.event) {
71-
// Subscription network/internal error
72-
case pvac::MonitorEvent::Fail:
73-
result->event_fail->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Fail, pv_name, mon.event.message, nullptr}));
74-
break;
75-
// explicit call of 'mon.cancel' or subscription dropped
76-
case pvac::MonitorEvent::Cancel:
77-
result->event_cancel->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Cancel, pv_name, mon.event.message, nullptr}));
78-
break;
79-
// Underlying channel becomes disconnected
80-
case pvac::MonitorEvent::Disconnect:
81-
result->event_disconnect->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Disconnec, pv_name, mon.event.message, nullptr}));
82-
break;
83-
// Data queue becomes not-empty
84-
case pvac::MonitorEvent::Data:
85-
// We drain event FIFO completely
86-
while (mon.poll()) {
87-
auto tmp_data = std::make_shared<epics::pvData::PVStructure>(mon.root->getStructure());
88-
tmp_data->copy(*mon.root);
89-
result->event_data->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Data, mon.event.message, {pv_name, tmp_data}}));
90-
}
91-
break;
92-
}
93-
return result;
42+
return MakeCombinedGetOperationUPtr(MakeSingleGetOperationShrdPtr(channel, pv_name, field),
43+
MakeSingleGetOperationShrdPtr(channel, pv_name, additional_filed));
9444
}
9545

96-
void
97-
EpicsChannel::stopMonitor() {
98-
if (mon) { mon.cancel(); }
46+
ConstMonitorOperationShrdPtr
47+
EpicsChannel::monitor(const std::string& fastUpdateField, const std::string& slowField ) const {
48+
return MakeMonitorOperationShrdPtr(channel, pv_name, fastUpdateField);
9949
}

src/k2eg/service/epics/EpicsChannel.h

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <k2eg/common/types.h>
66
#include <k2eg/service/epics/EpicsData.h>
77
#include <k2eg/service/epics/EpicsGetOperation.h>
8+
#include <k2eg/service/epics/EpicsMonitorOperation.h>
89
#include <k2eg/service/epics/EpicsPutOperation.h>
910
#include <pv/configuration.h>
1011
#include <pv/createRequest.h>
@@ -15,45 +16,22 @@
1516

1617
namespace k2eg::service::epics_impl {
1718

18-
enum EventType { Timeout, Fail, Cancel, Disconnec, Data };
19-
20-
typedef struct {
21-
EventType type;
22-
const std::string message;
23-
ChannelData channel_data;
24-
} MonitorEvent;
25-
DEFINE_PTR_TYPES(MonitorEvent)
26-
27-
typedef std::vector<MonitorEventShrdPtr> MonitorEventVec;
28-
typedef std::shared_ptr<MonitorEventVec> MonitorEventVecShrdPtr;
29-
30-
struct EventReceived {
31-
MonitorEventVecShrdPtr event_timeout = std::make_shared<MonitorEventVec>();
32-
MonitorEventVecShrdPtr event_data = std::make_shared<MonitorEventVec>();
33-
MonitorEventVecShrdPtr event_fail = std::make_shared<MonitorEventVec>();
34-
MonitorEventVecShrdPtr event_disconnect = std::make_shared<MonitorEventVec>();
35-
MonitorEventVecShrdPtr event_cancel = std::make_shared<MonitorEventVec>();
36-
};
37-
DEFINE_PTR_TYPES(EventReceived)
38-
3919
class EpicsChannel {
4020
friend class EpicsPutOperation;
4121
const std::string pv_name;
4222
const std::string address;
4323
epics::pvData::PVStructure::shared_pointer pvReq = epics::pvData::createRequest("field()");
44-
std::shared_ptr<pvac::ClientChannel> channel;
45-
pvac::MonitorSync mon;
24+
std::shared_ptr<pvac::ClientChannel> channel;
25+
pvac::MonitorSync mon;
4626

4727
public:
4828
explicit EpicsChannel(pvac::ClientProvider& provider, const std::string& pv_name, const std::string& address = std::string());
4929
~EpicsChannel();
50-
static void init();
51-
static void deinit();
52-
ConstPutOperationUPtr put(const std::string& field, const std::string& value);
53-
ConstGetOperationUPtr get(const std::string& field = "field()", const std::string& additional_filed = "") const;
54-
void startMonitor(const std::string& field = "field()");
55-
EventReceivedShrdPtr monitor();
56-
void stopMonitor();
30+
static void init();
31+
static void deinit();
32+
ConstPutOperationUPtr put(const std::string& field, const std::string& value);
33+
ConstGetOperationUPtr get(const std::string& field = "field()", const std::string& additional_filed = "") const;
34+
ConstMonitorOperationShrdPtr monitor(const std::string& fastUpdateField = "field()", const std::string& slowField = "") const;
5735
};
5836

5937
DEFINE_PTR_TYPES(EpicsChannel)

src/k2eg/service/epics/EpicsData.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,28 @@ struct ChannelData {
1616
};
1717
DEFINE_PTR_TYPES(ChannelData)
1818

19+
// event type
20+
enum EventType { Timeout, Fail, Cancel, Disconnec, Data };
21+
22+
typedef struct {
23+
EventType type;
24+
const std::string message;
25+
ChannelData channel_data;
26+
} MonitorEvent;
27+
DEFINE_PTR_TYPES(MonitorEvent)
28+
29+
typedef std::vector<MonitorEventShrdPtr> MonitorEventVec;
30+
typedef std::shared_ptr<MonitorEventVec> MonitorEventVecShrdPtr;
31+
32+
struct EventReceived {
33+
MonitorEventVecShrdPtr event_timeout = std::make_shared<MonitorEventVec>();
34+
MonitorEventVecShrdPtr event_data = std::make_shared<MonitorEventVec>();
35+
MonitorEventVecShrdPtr event_fail = std::make_shared<MonitorEventVec>();
36+
MonitorEventVecShrdPtr event_disconnect = std::make_shared<MonitorEventVec>();
37+
MonitorEventVecShrdPtr event_cancel = std::make_shared<MonitorEventVec>();
38+
};
39+
DEFINE_PTR_TYPES(EventReceived)
40+
1941
} // namespace k2eg::service::epics_impl
2042

2143
#endif // K2EG_SERVICE_EPICS_EPICSDATA_H_
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#include <k2eg/service/epics/EpicsMonitorOperation.h>
2+
#include <pv/createRequest.h>
3+
4+
#include <memory>
5+
#include <mutex>
6+
7+
#include "k2eg/service/epics/EpicsData.h"
8+
9+
using namespace k2eg::service::epics_impl;
10+
namespace pvd = epics::pvData;
11+
12+
MonitorOperation::MonitorOperation(std::shared_ptr<pvac::ClientChannel> channel, const std::string& pv_name, const std::string& field)
13+
: channel(channel), pv_name(pv_name), field(field), received_event(std::make_shared<EventReceived>()) {
14+
mon = channel->monitor(this, pvd::createRequest(field));
15+
}
16+
17+
MonitorOperation::~MonitorOperation() {
18+
if (mon) { mon.cancel(); }
19+
}
20+
21+
void
22+
MonitorOperation::monitorEvent(const pvac::MonitorEvent& evt) {
23+
// running on internal provider worker thread
24+
// minimize work here.
25+
unsigned fetched = 0;
26+
std::lock_guard<std::mutex> l(ce_mtx);
27+
switch (evt.event) {
28+
// Subscription network/internal error
29+
case pvac::MonitorEvent::Fail:
30+
received_event->event_fail->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Fail, pv_name, evt.message, nullptr}));
31+
break;
32+
// explicit call of 'mon.cancel' or subscription dropped
33+
case pvac::MonitorEvent::Cancel:
34+
// if (mon.valid()) {
35+
// //mon is valid so we can continnue because this class is valid
36+
// received_event->event_cancel->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Cancel, pv_name, evt.message, nullptr}));
37+
// }
38+
break;
39+
// Underlying channel becomes disconnected
40+
case pvac::MonitorEvent::Disconnect:
41+
received_event->event_disconnect->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Disconnec, pv_name, evt.message, nullptr}));
42+
break;
43+
// Data queue becomes not-empty
44+
case pvac::MonitorEvent::Data:
45+
// We drain event FIFO completely
46+
while (mon.poll()) {
47+
auto tmp_data = std::make_shared<epics::pvData::PVStructure>(mon.root->getStructure());
48+
tmp_data->copy(*mon.root);
49+
received_event->event_data->push_back(std::make_shared<MonitorEvent>(MonitorEvent{EventType::Data, evt.message, {pv_name, tmp_data}}));
50+
}
51+
break;
52+
}
53+
}
54+
55+
EventReceivedShrdPtr
56+
MonitorOperation::getEventData() const {
57+
std::lock_guard<std::mutex> l(ce_mtx);
58+
auto resutl = received_event;
59+
received_event = std::make_shared<EventReceived>();
60+
return resutl;
61+
}
62+
63+
bool
64+
MonitorOperation::hasData() const {
65+
std::lock_guard<std::mutex> l(ce_mtx);
66+
return received_event->event_data->size() > 0 || received_event->event_cancel->size() > 0 || received_event->event_disconnect->size() > 0 ||
67+
received_event->event_fail->size() > 0 || received_event->event_timeout->size() > 0;
68+
}
69+
70+
const std::string&
71+
MonitorOperation::getPVName() const {
72+
return pv_name;
73+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#ifndef K2EG_SERVICE_EPICS_EPICSMONITOROPERATION_H_
2+
#define K2EG_SERVICE_EPICS_EPICSMONITOROPERATION_H_
3+
4+
#include <k2eg/common/types.h>
5+
#include <k2eg/service/epics/EpicsData.h>
6+
#include <pvData.h>
7+
#include <pva/client.h>
8+
9+
#include <mutex>
10+
11+
namespace k2eg::service::epics_impl {
12+
// async monitor operation
13+
class MonitorOperation : public pvac::ClientChannel::MonitorCallback {
14+
const std::string field;
15+
const std::string pv_name;
16+
pvac::Monitor mon;
17+
std::shared_ptr<pvac::ClientChannel> channel;
18+
mutable EventReceivedShrdPtr received_event;
19+
mutable std::mutex ce_mtx;
20+
21+
public:
22+
MonitorOperation(std::shared_ptr<pvac::ClientChannel> channel, const std::string& pv_name, const std::string& field = "field()");
23+
virtual ~MonitorOperation();
24+
25+
virtual void monitorEvent(const pvac::MonitorEvent& evt) OVERRIDE FINAL;
26+
EventReceivedShrdPtr getEventData() const;
27+
bool hasData() const;
28+
const std::string& getPVName() const;
29+
};
30+
31+
DEFINE_PTR_TYPES(MonitorOperation)
32+
} // namespace k2eg::service::epics_impl
33+
34+
#endif // K2EG_SERVICE_EPICS_EPICSMONITOROPERATION_H_

0 commit comments

Comments
 (0)