Skip to content

Commit

Permalink
Add channel stats and list functionality
Browse files Browse the repository at this point in the history
This update adds new interfaces for Queues, PubSub, and CQ channel statistics in a new file, channel_stats.ts. It also extends the functionality of the queues, events_store, commands, queries, and events modules to include list functions for retrieving a listing of these different types of channels.
  • Loading branch information
liornabat committed Apr 25, 2024
1 parent b082c7e commit 83dab83
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 5 deletions.
95 changes: 95 additions & 0 deletions src/channel_stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
export interface QueuesStats {
/** The number of messages in the queue */
messages: number;

/** The total volume of the messages in the queue */
volume: number;

/** The number of messages waiting in the queue */
waiting: number;

/** The number of messages that have expired */
expired: number;

/** The number of delayed messages */
delayed: number;
}

export interface QueuesChannel {
/** The name of the channel */
name: string;

/** The type of the channel */
type: string;

/** The timestamp of the last activity on the channel */
lastActivity: number;

/** Indicates whether the channel is currently active or not */
isActive: boolean;

/** The statistics of incoming messages on the channel */
incoming: QueuesStats;

/** The statistics of outgoing messages on the channel */
outgoing: QueuesStats;
}

export interface PubSubStats {
/** The number of messages */
messages: number;

/** The volume of the messages */
volume: number;
}

export interface PubSubChannel {
/** The name of the channel */
name: string;

/** The type of the channel */
type: string;

/** The timestamp of the last activity on the channel */
lastActivity: number;

/** Indicates whether the channel is currently active */
isActive: boolean;

/** The statistics related to incoming messages on the channel */
incoming: PubSubStats;

/** The statistics related to outgoing messages on the channel */
outgoing: PubSubStats;
}

export interface CQStats {
/** The number of messages in the queue */
messages: number;

/** The volume of the queue */
volume: number;

/** The number of responses in the queue */
responses: number;
}

export interface CQChannel {
/** A string representing the name of the channel */
name: string;

/** A string representing the type of the channel */
type: string;

/** An integer representing the timestamp of the last activity on the channel */
lastActivity: number;

/** A boolean indicating whether the channel is active or not */
isActive: boolean;

/** An instance of the CQStats interface representing the incoming statistics of the channel */
incoming: CQStats;

/** An instance of the CQStats interface representing the outgoing statistics of the channel */
outgoing: CQStats;
}
18 changes: 17 additions & 1 deletion src/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { Config } from './config';
import * as pb from './protos';
import { Utils } from './utils';
import * as grpc from '@grpc/grpc-js';
import { createChannel, deleteChannel } from './common';
import { createChannel, deleteChannel, listCQChannels } from './common';
import { CQChannel } from './channel_stats';

/**
* command request base message
Expand Down Expand Up @@ -314,4 +315,19 @@ export class CommandsClient extends Client {
'commands',
);
}

/**
* List commands channels
* @param search
* @return Promise<CQChannel[]>
*/
list(search: string): Promise<CQChannel[]> {
return listCQChannels(
this.grpcClient,
this.getMetadata(),
this.clientOptions.clientId,
search,
'commands',
);
}
}
224 changes: 224 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ import * as pb from './protos';
import { Utils } from './utils';
import * as kubemq from './protos';
import * as grpc from '@grpc/grpc-js';
import {
PubSubChannel,
PubSubStats,
QueuesChannel,
QueuesStats,
CQChannel,
CQStats,
} from './channel_stats';

export function createChannel(
client: kubemq.kubemqClient,
Expand Down Expand Up @@ -60,3 +68,219 @@ export function deleteChannel(
});
});
}

export function listPubSubChannels(
client: kubemq.kubemqClient,
md: grpc.Metadata,
clientId: string,
search: string,
channelType: string,
): Promise<PubSubChannel[]> {
const pbMessage = new pb.Request();
pbMessage.setRequestid(Utils.uuid());
pbMessage.setClientid(clientId);
pbMessage.setRequesttypedata(2);
pbMessage.setChannel('kubemq.cluster.internal.requests');
pbMessage.setMetadata('list-channels');
const pbtags = pbMessage.getTagsMap();
pbtags.set('client_id', clientId);
pbtags.set('channel_type', channelType);
pbtags.set('channel_search', search);
pbMessage.setTimeout(10000);
return new Promise<PubSubChannel[]>((resolve, reject) => {
client.sendRequest(pbMessage, md, (e, data) => {
if (e) {
reject(e);
return;
}
if (!data) {
reject(new Error('no data'));
return;
}
if (data.getBody() === null) {
resolve([]);
return;
}
const channels = decodePubSubChannelList(data.getBody_asU8());
resolve(channels);
});
});
}

export function listQueuesChannels(
client: kubemq.kubemqClient,
md: grpc.Metadata,
clientId: string,
search: string,
channelType: string,
): Promise<QueuesChannel[]> {
const pbMessage = new pb.Request();
pbMessage.setRequestid(Utils.uuid());
pbMessage.setClientid(clientId);
pbMessage.setRequesttypedata(2);
pbMessage.setChannel('kubemq.cluster.internal.requests');
pbMessage.setMetadata('list-channels');
const pbtags = pbMessage.getTagsMap();
pbtags.set('client_id', clientId);
pbtags.set('channel_type', channelType);
pbtags.set('channel_search', search);
pbMessage.setTimeout(10000);
return new Promise<QueuesChannel[]>((resolve, reject) => {
client.sendRequest(pbMessage, md, (e, data) => {
if (e) {
reject(e);
return;
}
if (!data) {
reject(new Error('no data'));
return;
}
if (data.getBody() === null) {
resolve([]);
return;
}
const channels = decodeQueuesChannelList(data.getBody_asU8());
resolve(channels);
});
});
}

export function listCQChannels(
client: kubemq.kubemqClient,
md: grpc.Metadata,
clientId: string,
search: string,
channelType: string,
): Promise<CQChannel[]> {
const pbMessage = new pb.Request();
pbMessage.setRequestid(Utils.uuid());
pbMessage.setClientid(clientId);
pbMessage.setRequesttypedata(2);
pbMessage.setChannel('kubemq.cluster.internal.requests');
pbMessage.setMetadata('list-channels');
const pbtags = pbMessage.getTagsMap();
pbtags.set('client_id', clientId);
pbtags.set('channel_type', channelType);
pbtags.set('channel_search', search);
pbMessage.setTimeout(10000);
return new Promise<CQChannel[]>((resolve, reject) => {
client.sendRequest(pbMessage, md, (e, data) => {
if (e) {
reject(e);
return;
}
if (!data) {
reject(new Error('no data'));
return;
}
if (data.getBody() === null) {
resolve([]);
return;
}
const channels = decodeCQChannelList(data.getBody_asU8());
resolve(channels);
});
});
}

function decodePubSubChannelList(dataBytes: Uint8Array): PubSubChannel[] {
/**
* Decodes the given data bytes into a list of PubSubChannel objects.
*
* @param dataBytes The data bytes to decode.
* @returns A list of PubSubChannel objects.
*/
// Decode bytes to string and parse JSON
const dataStr = new TextDecoder().decode(dataBytes);
const channelsData = JSON.parse(dataStr);

const channels: PubSubChannel[] = [];
for (const item of channelsData) {
// Extracting incoming and outgoing as Stats objects
const incoming: PubSubStats = item['incoming'];
const outgoing: PubSubStats = item['outgoing'];

// Creating a Channel instance with the Stats objects
const channel: PubSubChannel = {
name: item['name'],
type: item['type'],
lastActivity: item['lastActivity'],
isActive: item['isActive'],
incoming,
outgoing,
};
channels.push(channel);
}

return channels;
}

function decodeQueuesChannelList(dataBytes: Uint8Array): QueuesChannel[] {
/**
* Decodes a byte string into a list of QueuesChannel objects.
*
* @param dataBytes The byte string to be decoded.
* @returns A list of QueuesChannel objects.
*
* Note:
* - This method assumes that the byte string is encoded in 'utf-8' format.
* - The byte string should represent a valid JSON object.
* - The JSON object should contain the necessary fields ('name', 'type', 'lastActivity', 'isActive', 'incoming', 'outgoing') for creating QueuesChannel objects.
* - The 'incoming' and 'outgoing' fields should contain valid JSON objects that can be parsed into QueuesStats objects.
*/
// Decode bytes to string and parse JSON
const dataStr = new TextDecoder().decode(dataBytes);
const channelsData = JSON.parse(dataStr);

const channels: QueuesChannel[] = [];
for (const item of channelsData) {
// Extracting incoming and outgoing as Stats objects
const incoming: QueuesStats = item['incoming'];
const outgoing: QueuesStats = item['outgoing'];

// Creating a Channel instance with the Stats objects
const channel: QueuesChannel = {
name: item['name'],
type: item['type'],
lastActivity: item['lastActivity'],
isActive: item['isActive'],
incoming,
outgoing,
};
channels.push(channel);
}

return channels;
}

function decodeCQChannelList(dataBytes: Uint8Array): CQChannel[] {
/**
* Decodes the given byte array into a list of CQChannel objects.
*
* @param dataBytes The byte array to decode.
* @returns The list of CQChannel objects decoded from the byte array.
*/
// Decode bytes to string and parse JSON
const dataStr = new TextDecoder().decode(dataBytes);
const channelsData = JSON.parse(dataStr);

const channels: CQChannel[] = [];
for (const item of channelsData) {
// Extracting incoming and outgoing as Stats objects
const incoming: CQStats = item['incoming'];
const outgoing: CQStats = item['outgoing'];

// Creating a Channel instance with the Stats objects
const channel: CQChannel = {
name: item['name'],
type: item['type'],
lastActivity: item['lastActivity'],
isActive: item['isActive'],
incoming,
outgoing,
};
channels.push(channel);
}

return channels;
}
18 changes: 17 additions & 1 deletion src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { Config } from './config';
import * as pb from './protos';
import { Utils } from './utils';
import * as grpc from '@grpc/grpc-js';
import { createChannel, deleteChannel } from './common';
import { createChannel, deleteChannel, listPubSubChannels } from './common';
import { PubSubChannel } from './channel_stats';

/**
* events base message
Expand Down Expand Up @@ -311,4 +312,19 @@ export class EventsClient extends Client {
'events',
);
}

/**
* List events channels
* @param search
* @return Promise<PubSubChannel[]>
*/
list(search: string): Promise<PubSubChannel[]> {
return listPubSubChannels(
this.grpcClient,
this.getMetadata(),
this.clientOptions.clientId,
search,
'events',
);
}
}
Loading

0 comments on commit 83dab83

Please sign in to comment.