From b53088561d042cff1d1ab883c3c57d2700f2074c Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Wed, 19 Jun 2024 18:46:23 +0200 Subject: [PATCH 1/3] Support to logstash added --- caso/messenger/logstash.py | 29 +++++++++++++++++++++++++---- caso/record.py | 16 ++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 6c19632b..5795de2a 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -24,7 +24,10 @@ from caso import exception import caso.messenger - +#add json lib +import json +#add datetime lib +import datetime opts = [ cfg.StrOpt("host", default="localhost", help="Logstash host to send records to."), @@ -49,11 +52,29 @@ def __init__(self, host=CONF.logstash.host, port=CONF.logstash.port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def push(self, records): + + # NOTE(acostantini): code for the serialization and push of the + # records in logstash. JSON format to be used and encoding UTF-8 + """Serialization of records to be sent to logstash""" + if not records: + return + + #Actual timestamp to be added on each record + cdt = datetime.datetime.now() + ct = int(datetime.datetime.now().timestamp()) + + #Open the connection with LS + self.sock.connect((self.host, self.port)) + """Push records to logstash using tcp.""" try: - self.sock.connect((self.host, self.port)) - for _, record in six.iteritems(records): - self.sock.sendall(record.as_json() + "\n") + for record in records: + #serialization of record + rec=record.logstash_message() + #cASO timestamp added to each record + rec['caso-timestamp']=ct + #Send the record to LS + self.sock.send((json.dumps(rec)+'\n').encode('utf-8')) except socket.error as e: raise exception.LogstashConnectionError( host=self.host, port=self.port, exception=e diff --git a/caso/record.py b/caso/record.py index 2ca04ff7..4bf699cd 100644 --- a/caso/record.py +++ b/caso/record.py @@ -49,6 +49,22 @@ def ssm_message(self): raise NotImplementedError("Method not implemented") + def logstash_message(self): + """Render record as the expected logstash message.""" + opts = { + "by_alias": True, + "exclude_none": True, + } + # NOTE(acostatnini): part related to the definition of the logstash message to be + # serialized before to send data + # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this + # is just a dictionary representation of the object, where no serialization is + # done. In order to get objects correctly serialized we need to convert to JSON, + # then reload the model + serialized_record = json.loads(self.json(**opts)) + return serialized_record + + class _ValidCloudStatus(str, enum.Enum): started = "started" completed = "completed" From 333787a13284449d63e172a0faa6918545f4ae10 Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Wed, 19 Jun 2024 18:47:26 +0200 Subject: [PATCH 2/3] Pydantic version set --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 4667ee56..17182add 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,4 @@ python-neutronclient>=6.7.0 # Apache-2.0 keystoneauth1>=3.4.0 # Apache-2.0 stevedore -pydantic +pydantic==1.10.7 From 2db6777efc6153ff41d8b8159bb8a4d5e5c534fc Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 18:53:30 +0200 Subject: [PATCH 3/3] Made generic the logstash serialization function --- caso/messenger/logstash.py | 2 +- caso/record.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 5795de2a..4ff1cf91 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -70,7 +70,7 @@ def push(self, records): try: for record in records: #serialization of record - rec=record.logstash_message() + rec=record.serialization_message() #cASO timestamp added to each record rec['caso-timestamp']=ct #Send the record to LS diff --git a/caso/record.py b/caso/record.py index 4bf699cd..06355d69 100644 --- a/caso/record.py +++ b/caso/record.py @@ -49,7 +49,7 @@ def ssm_message(self): raise NotImplementedError("Method not implemented") - def logstash_message(self): + def serialization_message(self): """Render record as the expected logstash message.""" opts = { "by_alias": True,