Skip to content

Gateway Exchange Protocol (GEP) Subscriber API

Stephen C. Wills edited this page Dec 1, 2015 · 2 revisions

This page provides a description of how to use the C++ implementation of the GEP Subscriber API. The Subscriber API defines the interface by which an application can set up a Data Subscriber to receive measurements from a Data Publisher over a computer network. This page assumes that the Data Publisher has already been set up within your network and is listening for connections from Data Subscribers.

Main steps

There are three major steps to follow in order to receive data from a Data Publisher.

  1. Register callbacks.
  2. Connect to the Data Publisher.
  3. Subscribe to the data points.

The following sections describe what you will need in order to execute each of these steps.

1. Registering callbacks

The Subscriber API is an event-driven API that allows users to receive data from the Data Subscriber via callbacks that are called when an event occurs during communications with the Data Publisher. The following callbacks can be registered by the application.

  • Status Message Callback
  • Error Message Callback
  • Data Start Time Callback
  • Metadata Callback
  • New Measurements Callback
  • Processing Complete Callback
  • Connection Terminated Callback

The following code snippet shows the function signatures for these callbacks and how to register them.

#include <string>
#include <vector>

#include <GSF/Common/Types.h>
#include <GSF/Transport/DataSubscriber.h>

namespace gsf = GridSolutionsFramework;
namespace gsft = gsf::Transport;

void ProcessStatusMessage(std::string message);
void ProcessErrorMessage(std::string message);
void ProcessDataStartTime(gsf::int64_t startTime);
void ProcessMetadata(std::vector<gsf::uint8_t> metadata);
void ProcessNewMeasurements(std::vector<gsf::Measurement> newMeasurements;
void ProcessProcessingComplete(std::string message);
void ProcessConnectionTerminated(gsf::DataSubscriber* subscriber);

int main()
{
    gsft::DataSubscriber subscriber;
	
    subscriber.RegisterStatusMessageCallback(&ProcessStatusMessage);
    subscriber.RegisterErrorMessageCallback(&ProcessErrorMessage);
    subscriber.RegisterDataStartTimeCallback(&ProcessDataStartTime);
    subscriber.RegisterMetadataCallback(&ProcessMetadata);
    subscriber.RegisterNewMeasurementsCallback(&ProcessNewMeasurements);
    subscriber.RegisterProcessingCompleteCallback(&ProcessProcessingComplete);
    subscriber.RegisterConnectionTerminatedCallback(&ProcessConnectionTerminated);
	
    return 0;
}

New Measurements Callback This callback is called whenever the Data Subscriber receives a collection of measurements from the Data Publisher.

Status Message Callback This callback is called whenever the Data Subscriber receives a generic status message to be displayed or ignored by the application.

Error Message Callback This callback is called whenever the Data Subscriber encounters an error. Errors can be sent by the Data Publisher or they can be encountered by the Data Subscriber itself while attempting to process messages from the Data Publisher. These error messages can be displayed or logged by the application.

Metadata Callback When the Data Subscriber has finished subscriber to the Data Publisher (step 3), the Data Publisher sends a message to the Data Subscriber containing a large collection of metadata that can be used to programmatically determine which data points may be available for subscription. By default, the C++ API requests this information is compressed (RFC 1951) and in XML format. The data must be decompressed and parsed by the application before using it.

Data Start Time Callback When the Data Subscriber has finished subscribing to the Data Publisher (step 3), the Data Publisher sends the current time of its local clock to the Data Subscriber. This message can be safely ignored by the application.

Processing Complete Callback When the Data Subscriber requests historic data via a Temporal Session, it specifies a start time and end time of the data it is requesting. When the Data Publisher has finished sending the requested data to the publisher, this callback is called to signify its completion.

Connection Terminated Callback If the connection between the Data Subscriber and the Data Publisher is forcefully interrupted, this callback is called to notify the application. The application can use this callback to trigger a reconnect.

2. Connecting to the Data Publisher

In order to connect to the Data Publisher, you must know the hostname (or IP address) and the port of the machine on which the Data Publisher is listening for connections. The following expands on our earlier example to show how to connect to the Data Publisher.

#include <string>
#include <vector>

#include <GSF/Common/Types.h>
#include <GSF/Transport/DataSubscriber.h>

namespace gsf = GridSolutionsFramework;
namespace gsft = gsf::Transport;

void ProcessStatusMessage(std::string message);
void ProcessErrorMessage(std::string message);
void ProcessDataStartTime(gsf::int64_t startTime);
void ProcessMetadata(std::vector<gsf::uint8_t> metadata);
void ProcessNewMeasurements(std::vector<gsf::Measurement> newMeasurements;
void ProcessProcessingComplete(std::string message);
void ProcessConnectionTerminated(gsf::DataSubscriber* subscriber);

int main()
{
    gsft::DataSubscriber subscriber;
	
    subscriber.RegisterStatusMessageCallback(&ProcessStatusMessage);
    subscriber.RegisterErrorMessageCallback(&ProcessErrorMessage);
    subscriber.RegisterDataStartTimeCallback(&ProcessDataStartTime);
    subscriber.RegisterMetadataCallback(&ProcessMetadata);
    subscriber.RegisterNewMeasurementsCallback(&ProcessNewMeasurements);
    subscriber.RegisterProcessingCompleteCallback(&ProcessProcessingComplete);
    subscriber.RegisterConnectionTerminatedCallback(&ProcessConnectionTerminated);

    subscriber.Connect("127.0.0.1", 6170);

    return 0;
}

3. Subscribe to the data points

The SubscriptionInfo object is a convenient way to set up and store subscription settings for an application that is using the Subscriber API. The following lists the settings that can be modified using the SubscriptionInfo object.

  • FilterExpression
    • The filter expression defines which measurements are being requested by the Data Subscriber. See below for more information on filter expression syntax and usage.
    • No default. Filter expression is required.
  • NewMeasurementsCallback
    • Alternate way to register the New Measurements Callback.
    • Default: 0
  • RemotelySynchronized
    • Requests that the measurements be concentrated by the Data Publisher before sending them to the Data Subscriber.
    • Default: false
  • Throttled
    • Requests that the measurements be throttled by the Data Publisher so that they can be received at a slower time interval.
    • Default: false
  • UdpDataChannel
    • Allows for data to be received over a separate UDP data channel, rather than the initial TCP connection.
    • Default: false
  • DataChannelLocalPort
    • Indicates which port the application is listening on for data.
    • This value is only relevant if UdpDataChannel is true.
    • Default: 9500
  • IncludeTime
    • Determines whether timestamps are included in the data transmitted by the Data Publisher. Timestamps can be omitted to decrease bandwidth.
    • Default: true
  • LagTime
    • Defines the tolerance of the Data Publisher for measurements that arrive marked with past timestamps.
    • This value is only relevant for remotely synchronized and throttled connections.
    • Default: 10.0
  • LeadTime
    • Defines the tolerance of the Data Publisher for measurements that arrive marked with future timestamps.
    • This value is only relevant for remotely synchronized and throttled connections.
    • Default: 5.0
  • UseLocalClockAsRealTime
    • Indicates whether the Data Publisher should use its local clock as real time, or whether it should instead use the timestamps of the measurements.
    • This should generally be false unless the Data Publisher is GPS time aligned. Default: false
  • UseMillisecondResolution
    • Defines the resolution of the timestamps of the measurements being transmitted between the Data Publisher and Data Subscriber.
    • If the timestamps of the measurements are only precise to the millisecond, bandwidth can be decreased by setting this value to true. Default: false
  • StartTime
    • When requesting historic data via a Temporal Session, this defines the start time of the data to be sent by the Data Publisher.
    • No default. This value is optional.
  • StopTime
    • When requesting historic data via a Temporal Session, this defines the stop time of the data to be sent by the Data Publisher.
    • No default. This value is optional.
  • ConstraintParameters
    • When requesting historic data via a Temporal Session, this defines additional constraint parameters supplied to temporal adapters in that Temporal Session.
    • No default. This value is optional.
  • ProcessingInterval
    • When requesting historic data via a Temporal Session, this defines the interval on which the Data Publisher should send the data to the Data Subscriber to be processed.
    • A value of -1 signifies the Data Publisher’s default value.
    • A value of 0 requests that data be processed as fast as possible.
    • Default: -1
  • WaitHandleNames
    • Defines the names of the wait handles used to deterministically wait for data to become available to the Data Publisher.
    • No default. This value is optional.
  • WaitHandleTimeout
    • Defines the amount of time to wait for data to become available to the Data Publisher.
    • Default: 0 (no timeout)

The following example expands upon the previous examples to show how to subscribe to a Data Publisher to start receiving measurements.

#include <string>
#include <vector>

#include <GSF/Common/Types.h>
#include <GSF/Transport/DataSubscriber.h>

namespace gsf = GridSolutionsFramework;
namespace gsft = gsf::Transport;

void ProcessStatusMessage(std::string message);
void ProcessErrorMessage(std::string message);
void ProcessDataStartTime(gsf::int64_t startTime);
void ProcessMetadata(std::vector<gsf::uint8_t> metadata);
void ProcessNewMeasurements(std::vector<gsf::Measurement> newMeasurements;
void ProcessProcessingComplete(std::string message);
void ProcessConnectionTerminated(gsf::DataSubscriber* subscriber);

int main()
{
    gsft::DataSubscriber subscriber;
    gsft::SubscriptionInfo info;

    info.FilterExpresion = "FILTER ActiveMeasurements WHERE SignalType = 'FREQ'";
	
    subscriber.RegisterStatusMessageCallback(&ProcessStatusMessage);
    subscriber.RegisterErrorMessageCallback(&ProcessErrorMessage);
    subscriber.RegisterDataStartTimeCallback(&ProcessDataStartTime);
    subscriber.RegisterMetadataCallback(&ProcessMetadata);
    subscriber.RegisterNewMeasurementsCallback(&ProcessNewMeasurements);
    subscriber.RegisterProcessingCompleteCallback(&ProcessProcessingComplete);
    subscriber.RegisterConnectionTerminatedCallback(&ProcessConnectionTerminated);

    subscriber.Connect("127.0.0.1", 6170);
    subscriber.Subscribe(info);

    return 0;
}

Filter Expression Syntax and Usage

Filter expression syntax and usage is defined by the Data Publisher. The current implementation of the Data Publisher in the Time Series Framework supports the following syntax.

  • Filter syntax: FILTER <TableName> WHERE <Expression> [ORDER BY <SortField>]
  • Field syntax: <Source>:<ID> [;<Source>:<ID> [...]]
  • SignalID syntax: <SignalID> [;<SignalID> [...]]

The filter syntax is a SQL-like syntax that can be used to query an in-memory data set stored by the Data Publisher. The ID column of the table denoted by <TableName> is used to determine which measurements are being requested. The WHERE clause allows you to filter the data set in a similar fashion to SQL. The ORDER BY clause enforces a specific ordering of the measurements so that they are always sent by the Data Publisher in a specific order. For more information on the proper syntax for <Expression>, visit http://www.csharp-examples.net/dataview-rowfilter/.

The field syntax allows you to select a specific list of measurements based on a human-readable identifier for that measurement. The source is typically one of either the name of the historian archiving that data point or the name of the device creating that data point. The ID is a simple integer identifier to distinguish between points coming from the same source. These identifiers are defined by the metadata stored by the Data Publisher.

The SignalID syntax makes use of globally unique identifiers to select a specific list of measurements. These identifiers are defined by the metadata stored by the Data Publisher.

If you are connecting to the openPDC or openPG, see http://openpdc.codeplex.com/wikipage?title=Connection%20Strings for more information and examples of filter expression syntax (filter expression syntax is the same as input measurement keys syntax).