-
Notifications
You must be signed in to change notification settings - Fork 1
Status server for the filewriter and adding Logging #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dashboard/generate.py
Outdated
def check_fw_pipeline(self, ipaddr, port): | ||
if self.test: | ||
return 5 | ||
try: | ||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
s.connect((ipaddr, port)) | ||
s.send(b"getstatus") | ||
data = s.recv(256) | ||
data2 = int(data.decode("utf-8").split()[1][0]) | ||
s.close() | ||
return data2 | ||
except: | ||
self.dprint("connection reset (by peer?)") | ||
return 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest switching to JSON for status messages from the services we poll in the dashboard and be more verbose with the status codes.
pros:
-
easier parsing on the Python side: we can access human readable keys directly as a dictionary, rather than splitting strings and working with indexes[0][1].
-
more specific error handling - for example, if the JSON is malformed, we get a JSONDecodeError. If a key is missing, we can catch a KeyError and handle it accordingly.
Right now, any issue in the try
block is treated as a connection error, which can be misleading if the actual problem is with the message format.
class ServiceStatus(IntEnum):
WORKING = 5
IDLE = 1
ERROR = 2
STOPPED = 3
INVALID = 4
def request_status(host, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.send(b"getstatus")
# Read until newline delimiter
data = b""
while not data.endswith(b"\n"):
chunk = s.recv(256)
if not chunk:
break
data += chunk
message = data.decode('utf-8').strip()
try:
return json.loads(message)
except JSONDecodeError:
logger.error(f"Error decoding JSON: {message}")
return { "status": ServiceStatus.INVALID.value, "message": f"Didn't receive valid message from the service: {host}:{port}" }
then we can define check_status like that:
def check_status(self):
response = request_status(host, port)
try:
status = ServiceStatus(int(response["status"]))
except KeyError:
logger.error(f"Status message must contain 'status' field. Ignoring message: {response}")
except ValueError:
logger.error(f"Status message contains invalid 'status' value. Valid status values: {list(ServiceStatus)}. Ignoring message: {response}")
else:
logging.debug("Setting new status: {self.idx} {status.name}")
self.lab.setstatus(self.idx, status.value)
the debug message could look like this:
Setting new status: 127.0.0.1 WORKING
example message json:
{"status": 1, "message": "optional debug message"} # for filewriters
{"status": 5, "stage1": 5, "stage2": 5, "stage2": 5} # for efus
{"status": 2, "message": "Unhealthy", "uptime": "2d4h"} # anything else
If we agree on a common status message format for all services, we can reuse request_status
and only implement a separate check_status
function that extracts the necessary information from the JSON, depending on the service being polled.
I think this will make the code easier to read and debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those ServiceStatus enumerations are what I was thinking of on the call @milnowak
I would suggest:
WRITING (green)
IDLE (green with lines)
STARTING (light green)
FINISHING (green)
FAILED (red)
and a status for OFFLINE (grey - response timed out; would need to be checked separately with ping like for EFUs)
Hopefully makes sense as a first pass from the meeting
c0ed4bc
to
bb9ff22
Compare
dashboard/generate.py
Outdated
for line in lines: | ||
try: | ||
obj = json.loads(line) | ||
# Look for any key ending with '.worker_state' | ||
for key, value in obj.items(): | ||
if key.endswith(".worker_state"): | ||
return int(value) | ||
except Exception: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for line in lines: | |
try: | |
obj = json.loads(line) | |
# Look for any key ending with '.worker_state' | |
for key, value in obj.items(): | |
if key.endswith(".worker_state"): | |
return int(value) | |
except Exception: | |
continue | |
for line in lines: | |
obj = json.loads(line) | |
# Look for any key ending with '.worker_state' | |
for key, value in obj.items(): | |
if key.endswith(".worker_state"): | |
return int(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this try/except
block isn't necessary. When the line
is {}
, json.loads works fine and the inner for loop is simply skipped (no items, empty list). The only potential issue is a ValueError when converting to int but I’d rather let that fail. The error message will clearly show that the server sent something unexpected.
for line in lines: | |
try: | |
obj = json.loads(line) | |
# Look for any key ending with '.worker_state' | |
for key, value in obj.items(): | |
if key.endswith(".worker_state"): | |
return int(value) | |
except Exception: | |
continue | |
for line in lines: | |
obj = json.loads(line) | |
# Look for any key ending with '.worker_state' | |
for key, value in obj.items(): | |
if key.endswith(".worker_state"): | |
return int(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the input! 🙌
I'll have it in mind going forward
I actually didn't want to have this to be reviewed since I'm yet to have it working (even with these changes). I'll have the PR as DRAFT until it's ready to be reviewed. Sorry not making it clear 😞
dashboard/generate.py
Outdated
except Exception: | ||
self.dprint("connection reset (by peer?)") | ||
return 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case the exception handling might be hiding the real error and just assuming the connection was reset. It could make it harder to understand what actually went wrong.
We should at least handle OSError and ConnectionError
except ConnectionError:
print("Connection reset by peer")
except OSError as e:
print(f"Socket error: {e}")
about workerState=0. does that mean the worker is idling? If the data is missing or couldn’t be parsed, maybe using something like -1 would make it clearer that the value is unknown, rather than a valid state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConnectionError
is a subclas of OSError
so catching only OSError
is just fine.
1ac5926
to
50c53f2
Compare
WIP - Will be back from DRAFT as soon as it is ready 👍 |
c18329d
to
889b6c8
Compare
@Andre-Lx-Costa if it helps, worker_state should always be last in the array We can also modify the JSON so that we have a [{"key": "foo.bar", "value":"123"}, {...}] structure instead if it helps (or drop the square brackets entirely etc), if it makes it easier to handle. There is also an option to create something like the EFU handling where a specific request is sent and only certain keys are returned |
This reverts commit 4e85d77.
2a5d946
to
116fd49
Compare
116fd49
to
f378b03
Compare
f378b03
to
eace2f6
Compare
1ce698f
to
9591c19
Compare
LGTM. I think we should hold off merging until we understand the behaviour of the socket in the filewriter though, just in case we need to change something else in this to get good behaviour. |
cab48fa
to
41ab917
Compare
It is now ready to be reviewed 👍 We simplified the metrics being provided by the filewriter under the merge request here: https://gitlab.esss.lu.se/ecdc/ess-dmsc/kafka-to-nexus/-/merge_requests/915 The dashboard is deployed and the filewriter corresponding to the Merge Request above is the |
dashboard/generate.py
Outdated
try: | ||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
s.settimeout(3.0) | ||
s.connect((ipaddr, port)) | ||
s.settimeout(20.0) | ||
s.shutdown(socket.SHUT_WR) | ||
data = b"" | ||
while True: | ||
chunk = s.recv(4096) | ||
if not chunk: | ||
break | ||
data += chunk | ||
text_data = data.decode("utf-8", errors="ignore").strip() | ||
try: | ||
parsed_json = json.loads(text_data) | ||
for entry in parsed_json: | ||
for key, value in entry.items(): | ||
if key.endswith(".worker_state"): | ||
return int(value) | ||
return -1 | ||
except Exception as e: | ||
self.dprint(f"JSON processing error: {e}") | ||
return -1 | ||
except OSError as e: | ||
self.dprint(f"Socket error: {e}") | ||
return -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- you can catch multiple exceptions in one
try
block - I'd really suggest using a logger instead of print. print hides the stack trace and makes debugging tough. Use logger.exception and include context (ip:port).
- return status objects instead of raw ints, it's way easier to read and avoids guesswork.
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.settimeout(3.0) | |
s.connect((ipaddr, port)) | |
s.settimeout(20.0) | |
s.shutdown(socket.SHUT_WR) | |
data = b"" | |
while True: | |
chunk = s.recv(4096) | |
if not chunk: | |
break | |
data += chunk | |
text_data = data.decode("utf-8", errors="ignore").strip() | |
try: | |
parsed_json = json.loads(text_data) | |
for entry in parsed_json: | |
for key, value in entry.items(): | |
if key.endswith(".worker_state"): | |
return int(value) | |
return -1 | |
except Exception as e: | |
self.dprint(f"JSON processing error: {e}") | |
return -1 | |
except OSError as e: | |
self.dprint(f"Socket error: {e}") | |
return -1 | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.settimeout(3.0) | |
s.connect((ipaddr, port)) | |
s.settimeout(20.0) | |
s.shutdown(socket.SHUT_WR) | |
data = b"" | |
while True: | |
chunk = s.recv(4096) | |
if not chunk: | |
break | |
data += chunk | |
parsed_json = json.loads(data.decode("utf-8", errors="ignore").strip()) | |
for entry in parsed_json: | |
for key, value in entry.items(): | |
if key.endswith(".worker_state"): | |
return Status(int(value)) | |
return Status.SERVER_RUNNING | |
except JSONDecodeError as e: | |
logger.exception(f"Failed to load JSON response from Filewriter {ipaddr}:{port}") | |
return Status.SERVER_RUNNING | |
except OSError as e: | |
logger.exception(f"Failed to read from Filewriter socket {ipaddr}:{port}") | |
return Status.SERVER_RUNNING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logger config
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("[%(levelname)s] %(message)s")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.addHandler(handler)
from enum import Enum
# adjust if needed
class Status(Enum):
INVALID = -1
SERVER_RUNNING = 0
SERVICE_RUNNING = 5
# switching between values and status objects
>>> Status(-1)
<Status.INVALID: -1>
>>> Status(-1).value
-1
>>> Status(5)
<Status.SERVICE_RUNNING: 5>
>>> Status.SERVICE_RUNNING.value
5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good change but can we resolve that in a different PR? Because we need to change the whole script, not just for this section on the filewriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the dprints
to keep the Repo's consistency.
If I am introducing the logger shouldn't I expand it for everything?
This dprint
only prints if we're running the dashboard with debug
as True
so it won't even show up anywhere at the moment.
I do appreciate the suggestion and will implement it! But I think I have to do it for everything to keep some consistency across the other services
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed suggestions 🙂
I've updated the PR now with 3c9a262
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am introducing the logger shouldn't I expand it for everything?
it’s totally fine to take this step by step.
Refactoring the whole codebase isn't always feasible and honestly, we don’t know if anyone will even have time for that.
IMO, it's better to start improving things now rather than sticking to bad practices just to stay consistent.
Having this in place makes it easier to push for proper logging and status handling in future PRs. We can point to this as the starting point, and later extract the logger to a shared module for reuse across other components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great job! 🥇
Co-authored-by: Milosz Nowak <[email protected]>
I added a status server to the filewriter
Since this is the first non EFU that reports a status a few changed were necessary:
type_fw = 5 was added
check_fw_pipeline() was added to parse the response from the server
check_service() was updated to handle fw
some formatting