diff --git a/python/Readme.md b/python/Readme.md new file mode 100644 index 0000000..f6e44da --- /dev/null +++ b/python/Readme.md @@ -0,0 +1,12 @@ +# Python example of the NetStreamGraph usage + +For running NetStreamGraph on python you need to do the following: +1. Run a server receiver for graph visualization. + + [Java server example](https://github.com/max-kalganov/graph_stream_server) +2. Use NetStreamProxyGraph to fill a graph. + + class import - `from gs_netstream import NetStreamProxyGraph` + See NetStreamProxyGraph implementation to look up graph methods. + +Run `example_sender.py` for an experiment. \ No newline at end of file diff --git a/python/example_sender.py b/python/example_sender.py index e3d261f..6a19568 100644 --- a/python/example_sender.py +++ b/python/example_sender.py @@ -1,19 +1,35 @@ import logging -from gs_netstream.sender import NetStreamProxyGraph, NetStreamSender +from time import sleep +from gs_netstream import NetStreamProxyGraph +from random import randint logging.basicConfig(level=logging.DEBUG) -sender = NetStreamSender(2012) -proxy = NetStreamProxyGraph(sender) -style = "node{fill-mode:plain;fill-color:gray;size:1px;}" -proxy.add_attribute("stylesheet", style) +def ex1(graph: NetStreamProxyGraph) -> None: + style = "node{fill-mode:plain;fill-color:gray;size:1px;}" + graph.add_attribute("stylesheet", style) -proxy.add_attribute("ui.antialias", True) -proxy.add_attribute("layout.stabilization-limit", 0) + graph.add_attribute("ui.antialias", True) + graph.add_attribute("layout.stabilization-limit", 0) -for i in range(0,500): - proxy.add_node(str(i)) - if i > 0: - proxy.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False) - proxy.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False) + for i in range(500): + sleep(0.2) + graph.add_node(str(i)) + if i > 0: + graph.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False) + graph.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False) + + +def ex2(graph: NetStreamProxyGraph) -> None: + graph.add_node("0") + for i in range(1, 200): + sleep(0.2) + graph.add_node(str(i)) + i2 = str(randint(0, i-1)) + graph.add_edge(f"{str(i)}_{i2}", str(i), i2) + + +if __name__ == '__main__': + graph = NetStreamProxyGraph(port=8008) + ex2(graph) diff --git a/python/gs_netstream/__init__.py b/python/gs_netstream/__init__.py index e69de29..7acce47 100644 --- a/python/gs_netstream/__init__.py +++ b/python/gs_netstream/__init__.py @@ -0,0 +1 @@ +from .graph_sender import NetStreamProxyGraph diff --git a/python/gs_netstream/common.py b/python/gs_netstream/common.py index 5beaa1f..2bb8e9c 100644 --- a/python/gs_netstream/common.py +++ b/python/gs_netstream/common.py @@ -6,53 +6,68 @@ Created by Yoann Pigné on 2011-08-21. Copyright (c) 2011 University of Luxembourg. All rights reserved. """ +from abc import ABC, abstractmethod -import sys -import os -class AttributeSink(object): +class AttributeSink(ABC): + @abstractmethod def graph_attribute_added(self, source_id, time_id, attribute, value): - raise NotImplementedError + pass + @abstractmethod def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value): - raise NotImplementedError + pass + @abstractmethod def graph_attribute_removed(self, source_id, time_id, attribute): - raise NotImplementedError + pass + @abstractmethod def node_attribute_added(self, source_id, time_id, node_id, attribute, value): - raise NotImplementedError + pass + @abstractmethod def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value): - raise NotImplementedError + pass + @abstractmethod def node_attribute_removed(self, source_id, time_id, node_id, attribute): - raise NotImplementedError + pass + @abstractmethod def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): - raise NotImplementedError + pass + @abstractmethod def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value): - raise NotImplementedError + pass + @abstractmethod def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): - raise NotImplementedError + pass -class ElementSink(object): + +class ElementSink(ABC): + @abstractmethod def node_added(self, source_id, time_id, node_id): - raise NotImplementedError + pass + @abstractmethod def node_removed(self, source_id, time_id, node_id): - raise NotImplementedError + pass + @abstractmethod def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): - raise NotImplementedError + pass + @abstractmethod def edge_removed(self, source_id, time_id, edge_id): - raise NotImplementedError + pass + @abstractmethod def step_begun(self, source_id, time_id, timestamp): - raise NotImplementedError + pass + @abstractmethod def graph_cleared(self, source_id, time_id): - raise NotImplementedError + pass diff --git a/python/gs_netstream/graph_sender.py b/python/gs_netstream/graph_sender.py new file mode 100644 index 0000000..593eeb9 --- /dev/null +++ b/python/gs_netstream/graph_sender.py @@ -0,0 +1,86 @@ +"""Proxy Netstream graph class""" +from random import random +from typing import Optional + +from gs_netstream.sender import NetStreamSender + + +class NetStreamProxyGraph: + """ + This is a utility class that handles 'source id' and 'time id' synchronization tokens. + It proposes utile classes that allow to directly send events through the network pipe. + """ + + def __init__(self, sender: Optional[NetStreamSender] = None, source_id: Optional[str] = None, port: int = 8008): + """Constructor can be with one NetStreamSender object and a source id OR with with 4 args. + + Notes: + 4 args: Source ID, Stream ID, Host, and port number + """ + self.sender = sender if sender is not None else NetStreamSender(port) + self.source_id = source_id if source_id else "nss%d" % (1000 * random()) + self.time_id = 0 + + def run_sender_method(self, sender_method, *args, **kwargs): + sender_method(self.source_id, self.time_id, *args, **kwargs) + self.time_id += 1 + + def add_node(self, node: str): + """Add a node to the graph.""" + self.run_sender_method(self.sender.node_added, node) + + def remove_node(self, node: str): + """Remove a node from the graph.""" + self.run_sender_method(self.sender.node_removed, node) + + def add_edge(self, edge: str, from_node: str, to_node: str, directed: bool = False): + """Add an edge to the graph.""" + self.run_sender_method(self.sender.edge_added, edge, from_node, to_node, directed) + + def remove_edge(self, edge: str): + """Remove an edge from the graph.""" + self.run_sender_method(self.sender.edge_removed, edge) + + def add_attribute(self, attribute: str, value): + """Add an attribute to the graph.""" + self.run_sender_method(self.sender.graph_attribute_added, attribute, value) + + def remove_attribute(self, attribute: str): + """Remove an attribute from the graph.""" + self.run_sender_method(self.sender.graph_attribute_removed, attribute) + + def change_attribute(self, attribute: str, old_value, new_value): + """Change an attribute of the graph.""" + self.run_sender_method(self.sender.graph_attribute_changed, attribute, old_value, new_value) + + def add_node_attribute(self, node: str, attribute: str, value): + """Add an attribute to a node.""" + self.run_sender_method(self.sender.node_attribute_added, node, attribute, value) + + def remove_node_attibute(self, node: str, attribute: str): + """Remove an attribute from a node.""" + self.run_sender_method(self.sender.node_attribute_removed, node, attribute) + + def change_node_attribute(self, node: str, attribute: str, old_value, new_value): + """Change an attribute of a node.""" + self.run_sender_method(self.sender.node_attribute_changed, node, attribute, old_value, new_value) + + def add_edge_attribute(self, edge: str, attribute: str, value): + """Add an attribute to an edge.""" + self.run_sender_method(self.sender.edge_attribute_added, edge, attribute, value) + + def remove_edge_attribute(self, edge: str, attribute: str): + """Remove an attribute from an edge.""" + self.run_sender_method(self.sender.edge_attribute_removed, edge, attribute) + + def change_edge_attribute(self, edge: str, attribute: str, old_value, new_value): + """Change an attribute of an edge.""" + self.run_sender_method(self.sender.edge_attribute_changed, edge, attribute, old_value, new_value) + + def clear_graph(self): + """Clear the graph.""" + self.run_sender_method(self.sender.graph_cleared) + + def step_begins(self, time: int): + """Begin a step.""" + self.run_sender_method(self.sender.step_begun, time) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index 656c821..3332596 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -14,15 +14,18 @@ import socket import struct -import varint +from typing import List, Any, Optional + +from .constants import * import logging -from random import random -from common import AttributeSink, ElementSink -from constants import * +from .common import AttributeSink, ElementSink +from .sender_utils import get_msg, get_type, encode_value + -class DefaultNetStreamTransport(object): +class DefaultNetStreamTransport: """Default transport class using TCP/IP networking.""" - def __init__(self, host, port): + + def __init__(self, host, port: int): """Initialize using host and port.""" self.host = host self.port = port @@ -30,10 +33,10 @@ def __init__(self, host, port): def connect(self): """Connect to remote server if necessary.""" - if not self.socket: + if self.socket is None: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.host, self.port)) - logging.info("connected to remote server") + logging.info(f"Connected to remote server - host = {self.host}, port = {self.port}") def send(self, data): """Send data to remote server.""" @@ -52,31 +55,35 @@ def close(self): self.socket = None logging.info("disconnected from remote server") + class NetStreamSender(AttributeSink, ElementSink): """One client must send to only one identified stream (streamID, host, port)""" def __init__(self, port, host="localhost", stream="default"): """Initialize using port, host (optional) and stream ID (optional).""" - self.host = host - self.port = port - self.stream = None - self.stream_buff = None + self.host: str = host + self.port: int = port + self.stream: Optional[str] = None + self.stream_buff: Optional[bytearray] = None self.set_stream(stream) - self.source_id = None - self.source_id_buff = None + self.source_id: Optional[str] = None + self.source_id_buff: Optional[bytearray] = None self.set_source_id("") - self.transport = None + self.transport: Optional[DefaultNetStreamTransport] = None self.connect() + def __del__(self): + self.close() + def set_stream(self, stream): """Set and cache a stream ID.""" self.stream = stream - self.stream_buff = self.encode_string(stream) + self.stream_buff = encode_value(stream, TYPE_STRING) - def set_source_id(self, source_id): + def set_source_id(self, source_id: str): """Set and cache a source ID.""" self.source_id = source_id - self.source_id_buff = self.encode_string(source_id) + self.source_id_buff = encode_value(source_id, TYPE_STRING) def connect(self): """Connect to the underlying transport.""" @@ -88,180 +95,53 @@ def send(self, event): packet.extend(self.stream_buff) packet.extend(event) buff = bytearray() - buff.extend(struct.pack("!i", len(packet))) # fixed 4-bytes size! + buff.extend(struct.pack("!i", len(packet))) # fixed 4-bytes size! buff.extend(packet) self.transport.send(buff) def close(self): """Close the underlying transport.""" - if self.transport: + if self.transport is not None: self.transport.close() - def get_type(self, value): - """Get the data type for a given value.""" - is_array = isinstance(value, list) - if is_array: - value = value[0] - if isinstance(value, bool): - if is_array: - return TYPE_BOOLEAN_ARRAY - return TYPE_BOOLEAN - elif isinstance(value, int): - if is_array: - return TYPE_INT_ARRAY - return TYPE_INT - elif isinstance(value, long): - if is_array: - return TYPE_LONG_ARRAY - return TYPE_LONG - elif isinstance(value, float): - if is_array: - return TYPE_DOUBLE_ARRAY - return TYPE_DOUBLE - elif isinstance(value, str) or isinstance(value, unicode): - return TYPE_STRING - elif isinstance(value, dict): - raise NotImplementedError("dicts are not supported") - - def encode_value(self, value, dtype): - """Encode a value according to a given data type.""" - if dtype is TYPE_BOOLEAN: - return self.encode_boolean(value) - elif dtype is TYPE_BOOLEAN_ARRAY: - return self.encode_boolean_array(value) - elif dtype is TYPE_INT: - return self.encode_int(value) - elif dtype is TYPE_INT_ARRAY: - return self.encode_int_array(value) - elif dtype is TYPE_LONG: - return self.encode_long(value) - elif dtype is TYPE_LONG_ARRAY: - return self.encode_long_array(value) - elif dtype is TYPE_DOUBLE: - return self.encode_double(value) - elif dtype is TYPE_DOUBLE_ARRAY: - return self.encode_double_array(value) - elif dtype is TYPE_STRING: - return self.encode_string(value) - return None - - def encode_boolean(self, value): - """Encode a boolean type.""" - return bytearray([value & 1]) - - def encode_boolean_array(self, value): - """Encode an array of boolean values.""" - if not isinstance(value, list): - raise TypeError("value is not an array") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(value))) - for elem in value: - if not isinstance(elem, bool): - raise TypeError("array element is not a boolean") - buff.extend(self.encode_boolean(elem)) - return buff - - def encode_int(self, value): - """Encode an integer type.""" - return varint.encode_unsigned(value) - - def encode_int_array(self, value): - """Encode an array of integer values.""" - if not isinstance(value, list): - raise TypeError("value is not an array") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(value))) - for elem in value: - if not isinstance(elem, int): - raise TypeError("array element is not an integer") - buff.extend(self.encode_int(elem)) - return buff - - def encode_long(self, value): - """Encode a long type.""" - return self.encode_int(value) # same as int for now - - def encode_long_array(self, value): - """Encode an array of long values.""" - return self.encode_int_array(value) # same as int_array for now - - def encode_double(self, value): - """Encode a double type.""" - return bytearray(struct.pack("!d", value)) - - def encode_double_array(self, value): - """Encode an array of double values.""" - if not isinstance(value, list): - raise TypeError("value is not an array") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(value))) - for elem in value: - if not isinstance(elem, float): - raise TypeError("array element is not a float/double") - buff.extend(self.encode_double(elem)) - return buff - - def encode_string(self, string): - """Encode a string type.""" - data = bytearray(string, "UTF-8") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(data))) - buff.extend(data) - return buff - - def encode_byte(self, value): - """Encode a byte type.""" - return bytearray([value]) + def send_msg(self, source_id: str, values: List[Any], value_types: List[int]): + if source_id != self.source_id: + self.set_source_id(source_id) + buff = get_msg(values, value_types) + self.send(buff) - # ========================= - # = AttributeSink methods = - # ========================= + ########################### + # ElementSink methods + ########################### - def node_added(self, source_id, time_id, node_id): + def node_added(self, source_id: str, time_id: int, node_id: str): """A node was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_NODE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_NODE, source_id, time_id, node_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("node added: %s", { "source_id": source_id, "time_id": time_id, "node_id": node_id }) - def node_removed(self, source_id, time_id, node_id): + def node_removed(self, source_id: str, time_id: int, node_id: str): """A node was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_NODE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_NODE, source_id, time_id, node_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("node removed: %s", { "source_id": source_id, "time_id": time_id, "node_id": node_id }) - def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): + def edge_added(self, source_id: str, time_id: int, edge_id: str, from_node: str, to_node: str, directed: bool): """An edge was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_EDGE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(from_node)) - buff.extend(self.encode_string(to_node)) - buff.extend(self.encode_boolean(directed)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_EDGE, source_id, time_id, edge_id, from_node, to_node, directed], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_STRING, + TYPE_BOOLEAN]) logging.debug("edge added: %s", { "source_id": source_id, "time_id": time_id, @@ -271,65 +151,48 @@ def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): "directed": directed }) - def edge_removed(self, source_id, time_id, edge_id): + def edge_removed(self, source_id: str, time_id: int, edge_id: str): """An edge was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_EDGE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_EDGE, source_id, time_id, edge_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("edge removed: %s", { "source_id": source_id, "time_id": time_id, "node_id": edge_id }) - def step_begun(self, source_id, time_id, timestamp): + def step_begun(self, source_id: str, time_id: int, timestamp: int): """A new step begun.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_STEP)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_double(timestamp)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_STEP, source_id, time_id, timestamp], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_DOUBLE]) logging.debug("step begun: %s", { "source_id": source_id, "time_id": time_id, "timestamp": timestamp }) - def graph_cleared(self, source_id, time_id): + def graph_cleared(self, source_id: str, time_id: int): """The graph was cleared.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CLEARED)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_CLEARED, source_id, time_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT]) logging.debug("graph cleared: %s", { "source_id": source_id, "time_id": time_id }) - def graph_attribute_added(self, source_id, time_id, attribute, value): + ########################### + # AttributeSink methods + ########################### + + def graph_attribute_added(self, source_id: str, time_id: int, attribute: str, value): """A graph attribute was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_GRAPH_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(value, dtype)) - self.send(buff) + dtype = get_type(value) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_GRAPH_ATTR, source_id, time_id, attribute, dtype, value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_BYTE, dtype]) logging.debug("graph attribute added: %s", { "source_id": source_id, "time_id": time_id, @@ -337,22 +200,16 @@ def graph_attribute_added(self, source_id, time_id, attribute, value): "value": value }) - def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value): + def graph_attribute_changed(self, source_id: str, time_id: int, attribute: str, old_value, new_value): """A graph attribute was changed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CHG_GRAPH_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(old_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(old_value, dtype)) - dtype = self.get_type(new_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(new_value, dtype)) - self.send(buff) + old_value_dtype = get_type(old_value) + new_value_dtype = get_type(new_value) + + self.send_msg(source_id=source_id, + values=[EVENT_CHG_GRAPH_ATTR, source_id, time_id, attribute, old_value_dtype, old_value, + new_value_dtype, new_value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_BYTE, old_value_dtype, + TYPE_BYTE, new_value_dtype]) logging.debug("graph attribute changed: %s", { "source_id": source_id, "time_id": time_id, @@ -361,36 +218,23 @@ def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_ "new_value": new_value }) - def graph_attribute_removed(self, source_id, time_id, attribute): + def graph_attribute_removed(self, source_id: str, time_id: int, attribute: str): """A graph attribute was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_GRAPH_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(attribute)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_GRAPH_ATTR, source_id, time_id, attribute], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("graph attribute removed: %s", { "source_id": source_id, "time_id": time_id, "attribute": attribute }) - def node_attribute_added(self, source_id, time_id, node_id, attribute, value): + def node_attribute_added(self, source_id: str, time_id: int, node_id: str, attribute: str, value): """A node attribute was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_NODE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(value, dtype)) - self.send(buff) + dtype = get_type(value) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_NODE_ATTR, source_id, time_id, node_id, attribute, dtype, value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, dtype]) logging.debug("node attribute added: %s", { "source_id": source_id, "time_id": time_id, @@ -399,23 +243,16 @@ def node_attribute_added(self, source_id, time_id, node_id, attribute, value): "value": value }) - def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value): + def node_attribute_changed(self, source_id: str, time_id: int, node_id: str, attribute: str, old_value, new_value): """A node attribute was changed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CHG_NODE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(old_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(old_value, dtype)) - dtype = self.get_type(new_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(new_value, dtype)) - self.send(buff) + old_value_dtype = get_type(old_value) + new_value_dtype = get_type(new_value) + + self.send_msg(source_id=source_id, + values=[EVENT_CHG_NODE_ATTR, source_id, time_id, node_id, attribute, old_value_dtype, old_value, + new_value_dtype, new_value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, + old_value_dtype, TYPE_BYTE, new_value_dtype]) logging.debug("node attribute changed: %s", { "source_id": source_id, "time_id": time_id, @@ -425,17 +262,11 @@ def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_val "new_value": new_value }) - def node_attribute_removed(self, source_id, time_id, node_id, attribute): + def node_attribute_removed(self, source_id: str, time_id: int, node_id: str, attribute: str): """A node attribute was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_NODE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - buff.extend(self.encode_string(attribute)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_NODE_ATTR, source_id, time_id, node_id, attribute], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING]) logging.debug("node attribute removed: %s", { "source_id": source_id, "time_id": time_id, @@ -443,20 +274,12 @@ def node_attribute_removed(self, source_id, time_id, node_id, attribute): "attribute": attribute }) - def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): + def edge_attribute_added(self, source_id: str, time_id: int, edge_id: str, attribute: str, value): """An edge attribute was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_EDGE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(value, dtype)) - self.send(buff) + dtype = get_type(value) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_EDGE_ATTR, source_id, time_id, edge_id, attribute, dtype, value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, dtype]) logging.debug("edge attribute added: %s", { "source_id": source_id, "time_id": time_id, @@ -465,23 +288,17 @@ def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): "value": value }) - def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value): + def edge_attribute_changed(self, source_id: str, time_id: int, edge_id: str, attribute: str, old_value, new_value): """An edge attribute was changed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CHG_EDGE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(old_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(old_value, dtype)) - dtype = self.get_type(new_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(new_value, dtype)) - self.send(buff) + old_value_dtype = get_type(old_value) + new_value_dtype = get_type(new_value) + + self.send_msg(source_id=source_id, + values=[EVENT_CHG_EDGE_ATTR, source_id, time_id, edge_id, attribute, old_value_dtype, old_value, + new_value_dtype, new_value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, + old_value_dtype, TYPE_BYTE, new_value_dtype]) + logging.debug("edge attribute changed: %s", { "source_id": source_id, "time_id": time_id, @@ -491,107 +308,15 @@ def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_val "new_value": new_value }) - def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): + def edge_attribute_removed(self, source_id: str, time_id: int, edge_id: str, attribute: str): """An edge attribute was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_EDGE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(attribute)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_EDGE_ATTR, source_id, time_id, edge_id, attribute], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING]) + logging.debug("edge attribute removed: %s", { "source_id": source_id, "time_id": time_id, "edge_id": edge_id, "attribute": attribute }) - -class NetStreamProxyGraph(): - """ - This is a utility class that handles 'source id' and 'time id' synchronization tokens. - It proposes utile classes that allow to directly send events through the network pipe. - """ - - def __init__(self, sender, source_id=None): - """Constructor can be with one NetStreamSender object and a source id OR with with 4 args: Source ID, Stream ID, Host, and port number""" - self.sender = sender - self.source_id = source_id if source_id else "nss%d" % (1000 * random()) - self.time_id = 0 - - def add_node(self, node): - """Add a node to the graph.""" - self.sender.node_added(self.source_id, self.time_id, node) - self.time_id += 1 - - def remove_node(self, node): - """Remove a node from the graph.""" - self.sender.node_removed(self.source_id, self.time_id, node) - self.time_id += 1 - - def add_edge(self, edge, from_node, to_node, directed=False): - """Add an edge to the graph.""" - self.sender.edge_added(self.source_id, self.time_id, edge, from_node, to_node, directed) - self.time_id += 1 - - def remove_edge(self, edge): - """Remove an edge from the graph.""" - self.sender.edge_removed(self.source_id, self.time_id, edge) - self.time_id += 1 - - def add_attribute(self, attribute, value): - """Add an attribute to the graph.""" - self.sender.graph_attribute_added(self.source_id, self.time_id, attribute, value) - self.time_id += 1 - - def remove_attribute(self, attribute): - """Remove an attribute from the graph.""" - self.sender.graph_attribute_removed(self.source_id, self.time_id, attribute) - self.time_id += 1 - - def change_attribute(self, attribute, old_value, new_value): - """Change an attribute of the graph.""" - self.sender.graph_attribute_changed(self.source_id, self.time_id, attribute, old_value, new_value) - self.time_id += 1 - - def add_node_attribute(self, node, attribute, value): - """Add an attribute to a node.""" - self.sender.node_attribute_added(self.source_id, self.time_id, node, attribute, value) - self.time_id += 1 - - def remove_node_attibute(self, node, attribute): - """Remove an attribute from a node.""" - self.sender.node_attribute_removed(self.source_id, self.time_id, node, attribute) - self.time_id += 1 - - def change_node_attribute(self, node, attribute, old_value, new_value): - """Change an attribute of a node.""" - self.sender.node_attribute_changed(self.source_id, self.time_id, node, attribute, old_value, new_value) - self.time_id += 1 - - def add_edge_attribute(self, edge, attribute, value): - """Add an attribute to an edge.""" - self.sender.edge_attribute_added(self.source_id, self.time_id, edge, attribute, value) - self.time_id += 1 - - def remove_edge_attribute(self, edge, attribute): - """Remove an attribute from an edge.""" - self.sender.edge_attribute_removed(self.source_id, self.time_id, edge, attribute) - self.time_id += 1 - - def change_edge_attribute(self, edge, attribute, old_value, new_value): - """Change an attribute of an edge.""" - self.sender.edge_attribute_changed(self.source_id, self.time_id, edge, attribute, old_value, new_value) - self.time_id += 1 - - def clear_graph(self): - """Clear the graph.""" - self.sender.graph_cleared(self.source_id, self.time_id) - self.time_id += 1 - - def step_begins(self, time): - """Begin a step.""" - self.sender.step_begun(self.source_id, self.time_id, time) - self.time_id += 1 diff --git a/python/gs_netstream/sender_utils.py b/python/gs_netstream/sender_utils.py new file mode 100644 index 0000000..cf270ec --- /dev/null +++ b/python/gs_netstream/sender_utils.py @@ -0,0 +1,113 @@ +"""Sender encoding utils""" +import struct +from itertools import chain +from typing import List, Callable, Any, Optional + +from .constants import * +import numpy as np + + +ENCODING_SIZES = np.array([1 << 7, 1 << 14, 1 << 21, 1 << 28, 1 << 35, 1 << 42, 1 << 49, 1 << 56]) + + +def encoding_size(value: int) -> int: + """Computes the encoding size of a value.""" + dist = (ENCODING_SIZES - value) <= 0 + return 9 if not np.all(dist) else np.argmin(dist) + 1 + + +def encode_unsigned(value: int) -> bytearray: + """Encodes a Python integer into its varint representation.""" + assert isinstance(value, int) and value >= 0, f"Value argument is not an integer or is negative = {value}" + size = encoding_size(value) + buff = bytearray(size) + for i in range(size): + head = 128 + if i == size - 1: + head = 0 + buff[i] = (((value >> (7 * i)) & 127) ^ head) & 255 + return buff + + +TYPES_CONVERTER = { + 'bool': (TYPE_BOOLEAN, TYPE_BOOLEAN_ARRAY), + 'int': (TYPE_INT, TYPE_INT_ARRAY), + 'float': (TYPE_DOUBLE, TYPE_DOUBLE_ARRAY), + 'str': (TYPE_STRING, TYPE_STRING) +} + + +def get_type(value: Any) -> int: + """Get the data type for a given value.""" + is_array = isinstance(value, list) + value_type_str = type(value[0] if is_array else value).__name__ + netstream_type = TYPES_CONVERTER.get(value_type_str, None) + if netstream_type is None: + raise NotImplementedError("dicts are not supported") + + type_pos = int(is_array) + return netstream_type[type_pos] + + +def encode_array(values_array: List, single_value_type, encoding_method: Callable) -> bytearray: + assert isinstance(values_array, list) and all([isinstance(v, single_value_type) for v in values_array]), \ + f"Values_array should be an array with values of type {single_value_type}, but values array = {values_array}" + return bytearray(chain(encode_unsigned(len(values_array)), *[encoding_method(elem) for elem in values_array])) + + +def encode_boolean(value: bool) -> bytearray: + """Encode a boolean type.""" + return bytearray([value & 1]) + + +def encode_boolean_array(value: List[bool]) -> bytearray: + """Encode an array of boolean values.""" + return encode_array(value, bool, encode_boolean) + + +def encode_int_array(value: List[int]) -> bytearray: + """Encode an array of integer values.""" + return encode_array(value, int, encode_unsigned) + + +def encode_double(value: int) -> bytearray: + """Encode a double type.""" + return bytearray(struct.pack("!d", value)) + + +def encode_double_array(value: List[int]) -> bytearray: + """Encode an array of double values.""" + return encode_array(value, float, encode_double) + + +def encode_string(string: str) -> bytearray: + """Encode a string type.""" + data = bytearray(string, "UTF-8") + return bytearray(chain(encode_unsigned(len(data)), data)) + + +def encode_byte(value) -> bytearray: + """Encode a byte type.""" + return bytearray([value]) + + +TYPE_TO_ENCODER = { + TYPE_BYTE: encode_byte, + TYPE_BOOLEAN: encode_boolean, + TYPE_BOOLEAN_ARRAY: encode_boolean_array, + TYPE_INT: encode_unsigned, + TYPE_INT_ARRAY: encode_int_array, + TYPE_DOUBLE: encode_double, + TYPE_DOUBLE_ARRAY: encode_double_array, + TYPE_STRING: encode_string +} + + +def encode_value(value: Any, dtype: int) -> Optional[bytearray]: + """Encode a value according to a given data type.""" + encoder = TYPE_TO_ENCODER.get(dtype, None) + return encoder(value) if encoder is not None else None + + +def get_msg(values: List[Any], types: List[int]) -> bytearray: + return bytearray(chain(*[encode_value(value, value_type) for value, value_type in zip(values, types)])) diff --git a/python/gs_netstream/varint.py b/python/gs_netstream/varint.py deleted file mode 100644 index 7d3b03e..0000000 --- a/python/gs_netstream/varint.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python - -"""Implementation for encoding unsigned varints.""" - -def encoding_size(value): - """Computes the encoding size of a value.""" - if value < (1L << 7): - return 1 - if value < (1L << 14): - return 2 - if value < (1L << 21): - return 3 - if value < (1L << 28): - return 4 - if value < (1L << 35): - return 5 - if value < (1L << 42): - return 6 - if value < (1L << 49): - return 7 - if value < (1L << 56): - return 8 - return 9 - -def encode_unsigned(value): - """Encodes a Python integer into its varint representation.""" - if not isinstance(value, int) or value < 0: - raise TypeError("value argument is not an integer or is negative") - size = encoding_size(value) - buff = bytearray(size) - for i in xrange(size): - head = 128 - if i == size - 1: - head = 0 - buff[i] = (((value >> (7 * i)) & 127) ^ head) & 255 - return buff