diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 6c19632..4ff1cf9 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -24,7 +24,10 @@ from caso import exception import caso.messenger - +#add json lib +import json +#add datetime lib +import datetime opts = [ cfg.StrOpt("host", default="localhost", help="Logstash host to send records to."), @@ -49,11 +52,29 @@ def __init__(self, host=CONF.logstash.host, port=CONF.logstash.port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def push(self, records): + + # NOTE(acostantini): code for the serialization and push of the + # records in logstash. JSON format to be used and encoding UTF-8 + """Serialization of records to be sent to logstash""" + if not records: + return + + #Actual timestamp to be added on each record + cdt = datetime.datetime.now() + ct = int(datetime.datetime.now().timestamp()) + + #Open the connection with LS + self.sock.connect((self.host, self.port)) + """Push records to logstash using tcp.""" try: - self.sock.connect((self.host, self.port)) - for _, record in six.iteritems(records): - self.sock.sendall(record.as_json() + "\n") + for record in records: + #serialization of record + rec=record.serialization_message() + #cASO timestamp added to each record + rec['caso-timestamp']=ct + #Send the record to LS + self.sock.send((json.dumps(rec)+'\n').encode('utf-8')) except socket.error as e: raise exception.LogstashConnectionError( host=self.host, port=self.port, exception=e diff --git a/caso/record.py b/caso/record.py index 2ca04ff..06355d6 100644 --- a/caso/record.py +++ b/caso/record.py @@ -49,6 +49,22 @@ def ssm_message(self): raise NotImplementedError("Method not implemented") + def serialization_message(self): + """Render record as the expected logstash message.""" + opts = { + "by_alias": True, + "exclude_none": True, + } + # NOTE(acostatnini): part related to the definition of the logstash message to be + # serialized before to send data + # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this + # is just a dictionary representation of the object, where no serialization is + # done. In order to get objects correctly serialized we need to convert to JSON, + # then reload the model + serialized_record = json.loads(self.json(**opts)) + return serialized_record + + class _ValidCloudStatus(str, enum.Enum): started = "started" completed = "completed" diff --git a/requirements.txt b/requirements.txt index 4667ee5..17182ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,4 @@ python-neutronclient>=6.7.0 # Apache-2.0 keystoneauth1>=3.4.0 # Apache-2.0 stevedore -pydantic +pydantic==1.10.7