Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MDT Dialout Support #10

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pypi"
grpcio-tools = "*"
googleapis-common-protos = "*"
pylint = "*"
black = "*"
black = "==19.3b0"
twine = "*"

[packages]
Expand Down
502 changes: 267 additions & 235 deletions Pipfile.lock

Large diffs are not rendered by default.

157 changes: 157 additions & 0 deletions nxos_grpc/mdt_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Copyright 2019 Cisco Systems

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

"""NX-OS gRPC Python MDT dialout server."""

from concurrent import futures
from threading import Lock
import grpc
from . import proto


def create_mdt_server(
insecure_addresses=["[::]:50051"], secure_addresses=[], max_workers=10
):
"""Utility method to start up a gRPC server receiving NX-OS MDT.
Derived from gRPC Basics - Python documentation.
https://grpc.io/docs/tutorials/basic/python/

Parameters
----------
insecure_addresses : list, optional
A list of strings which are IP:port(s) to serve without security.
This or secure_addresses must be specified.
secure_addresses : list, optional
A list of tuples which are (address, server_credential) to serve with security.
This or insecure_addresses must be specified.
max_workers : int, optional
The number of concurrent workers to handle RPCs.

Returns
-------
grpc_server : gRPC Server
The gRPC server instance spun up for usage.
Used to start/stop server.
mdt_server : MDTServer
The MDTServer instance.
Used to add callbacks to handle messages.
"""
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
mdt_server = MDTServer()
proto.add_gRPCMdtDialoutServicer_to_server(mdt_server, grpc_server)
for address in insecure_addresses:
grpc_server.add_insecure_port(address)
for address, credential in secure_addresses:
grpc_server.add_secure_port(address, credential)
grpc_server.start()
return grpc_server, mdt_server


class MDTServer(proto.gRPCMdtDialoutServicer):
"""NX-OS gRPC MDT dialout server implementation.

Data is propagated to interested parties via callbacks.

Methods
-------
add_callback(...)
Add a function signature to be called per message for handling.

Examples
--------
>>> from nxos_grpc import create_mdt_server
>>> grpc_server, mdt_server = create_mdt_server()
>>> def nothing_much(message):
... print(message)
>>> mdt_server.add_callback(nothing_much)
...
"""

def __init__(self):
"""Initializes the MDTServer object and internal state items.
"""
# Uncertain if we need a lock, but concurrency
self.__chunk_map_lock = Lock()
# Map to store all our chunks
self.__chunk_map = {}
# Callbacks we've been provided
self.__callbacks = []

def add_callback(self, func):
"""Add a callback to handle each MDT message.

Parameters
----------
func : function
The function signature to be called for callback.
"""
self.__callbacks.append(func)

def MdtDialout(self, request_iterator, context):
"""Services the MDT messages from NX-OS."""
for message_chunk in request_iterator:
telemetry_pb = self.__assemble_telemetry_pb(message_chunk)
if telemetry_pb is not None:
for callback in self.__callbacks:
callback(telemetry_pb)
yield proto.MdtDialoutArgs(ReqId=message_chunk.ReqId)

def __assemble_telemetry_pb(self, message_chunk):
"""Handles NX-OS telemetry chunking.
Stores chunked messages in self.__chunk_map and returns
fully assembled messages once they match the specified totalSize.
"""
message = None
if len(message_chunk.data) < message_chunk.totalSize:
# Need to begin storing to reassemble
with self.__chunk_map_lock:
if message_chunk.ReqId in self.__chunk_map:
self.__chunk_map[message.ReqId] += message_chunk.data
else:
self.__chunk_map[message.ReqId] = message_chunk.data
if (
len(self.__chunk_map[message_chunk.ReqId])
== message_chunk.totalSize
):
message = proto.Telemetry()
message.ParseFromString(self.__chunk_map.pop(message_chunk.ReqId))
elif (
len(self.__chunk_map[message_chunk.ReqId]) > message_chunk.totalSize
):
raise Exception(
"Message %s assembly (%i) is larger than totalSize (%i)!"
% (
str(message_chunk.ReqId),
len(self.__chunk_map[message_chunk.ReqId]),
message_chunk.totalSize,
)
)
elif (
message_chunk.totalSize
and len(message_chunk.data) > message_chunk.totalSize
):
raise Exception(
"Message %s chunk (%i) is larger than totalSize (%i)!"
% (
str(message_chunk.ReqId),
len(message_chunk.data),
message_chunk.totalSize,
)
)
else:
# message_chunk is a complete telemetry message
message = proto.Telemetry()
message.ParseFromString(message_chunk.data)
return message
8 changes: 7 additions & 1 deletion nxos_grpc/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .nxos_grpc_pb2_grpc import gRPCConfigOperStub
from .telemetry_bis_pb2 import Telemetry
from .nxos_grpc_pb2 import (
GetOperArgs,
GetArgs,
Expand All @@ -8,3 +8,9 @@
CloseSessionArgs,
KillArgs,
)
from .nxos_grpc_pb2_grpc import gRPCConfigOperStub
from .mdt_dialout_pb2 import MdtDialoutArgs
from .mdt_dialout_pb2_grpc import (
gRPCMdtDialoutServicer,
add_gRPCMdtDialoutServicer_to_server,
)
14 changes: 14 additions & 0 deletions nxos_grpc/proto/mdt_dialout.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package mdt_dialout;

service gRPCMdtDialout {
rpc MdtDialout(stream MdtDialoutArgs) returns(stream MdtDialoutArgs) {};
}

message MdtDialoutArgs {
int64 ReqId = 1;
bytes data = 2;
string errors = 3;
int32 totalSize = 4; // Set for messages that are chunked, it contains the original message size.
}
115 changes: 115 additions & 0 deletions nxos_grpc/proto/mdt_dialout_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions nxos_grpc/proto/mdt_dialout_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc

from nxos_grpc.proto import mdt_dialout_pb2 as nxos__grpc_dot_proto_dot_mdt__dialout__pb2


class gRPCMdtDialoutStub(object):
# missing associated documentation comment in .proto file
pass

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.MdtDialout = channel.stream_stream(
'/mdt_dialout.gRPCMdtDialout/MdtDialout',
request_serializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.SerializeToString,
response_deserializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.FromString,
)


class gRPCMdtDialoutServicer(object):
# missing associated documentation comment in .proto file
pass

def MdtDialout(self, request_iterator, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_gRPCMdtDialoutServicer_to_server(servicer, server):
rpc_method_handlers = {
'MdtDialout': grpc.stream_stream_rpc_method_handler(
servicer.MdtDialout,
request_deserializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.FromString,
response_serializer=nxos__grpc_dot_proto_dot_mdt__dialout__pb2.MdtDialoutArgs.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'mdt_dialout.gRPCMdtDialout', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
Loading