Skip to content

Commit

Permalink
Adding debug call, adding test for start workers, and fixing placehol…
Browse files Browse the repository at this point in the history
…der docstring
  • Loading branch information
gustavogaldinoo committed Aug 15, 2024
1 parent d3b9c07 commit 67f99a1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
11 changes: 11 additions & 0 deletions experiment/measurer/measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Module for measurer workers logic."""
import time
import json
import os
from typing import Dict, Union, Optional
import google.api_core.exceptions
from google.cloud import pubsub_v1
Expand Down Expand Up @@ -53,9 +54,19 @@ def put_result_in_response_queue(
retrieve"""
raise NotImplementedError

def _write_pid_to_fs(self):
"""Debugging method"""
pid = os.getpid()
with open('worker-pid.txt', 'w+', encoding='utf-8') as pid_file:
pid_file.write(str(pid))

def measure_worker_loop(self):
"""Periodically retrieves request from request queue, measure it, and
put result in response queue"""
# Write pid to file system to check if worker process is being started
# correctly. Only for debug purposes, will be removed later
self._write_pid_to_fs()

logs.initialize(default_extras={
'component': 'measurer',
'subcomponent': 'worker',
Expand Down
16 changes: 16 additions & 0 deletions experiment/measurer/test_measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import shutil
from unittest import mock
import multiprocessing
import queue
import pytest

Expand Down Expand Up @@ -636,3 +637,18 @@ def test_gcloud_measure_manager_get_snapshot_from_response_queue(

result = gcloud_measure_manager.get_result_from_response_queue('')
assert isinstance(result, models.Snapshot)


@mock.patch('experiment.measurer.measure_worker.GoogleCloudMeasureWorker')
def test_gcloud_measure_manager_start_workers(mock_gcloud_measure_worker,
gcloud_measure_manager):
"""Tests that the start workers method is calling the measure worker loop
method, a number of times equal to the number of measurers CPUs."""
cpus_available = multiprocessing.cpu_count()
gcloud_measure_manager.measurers_cpus = cpus_available
with mock.patch('multiprocessing.pool.Pool.apply_async') as pool:
gcloud_measure_manager.start_workers('request-queue-topic',
'response-queue-topic', pool)
assert pool.apply_async.call_count == cpus_available
pool.apply_async.assert_called_with(
mock_gcloud_measure_worker().measure_worker_loop)
9 changes: 7 additions & 2 deletions experiment/measurer/test_measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disabl

def test_get_task_from_request_queue_gcloud_worker_calls_acknowledge(
gcloud_measure_worker): # pylint: disable=redefined-outer-name
"""a"""
"""Tests that the method get_task_from_request_queue from
GoogleCloudMeasureWorker worker is calling acknowledge after pulling a
message from the queue."""
pull_return = mock.MagicMock()
received_message = mock.MagicMock()
received_message.ack_id = 0
Expand All @@ -120,7 +122,10 @@ def test_get_task_from_request_queue_gcloud_worker_calls_acknowledge(


def test_get_task_from_request_queue_gcloud_worker(gcloud_measure_worker): # pylint: disable=redefined-outer-name
"""a"""
"""Tests that the method get_task_from_request_queue from
GoogleCloudMeasureWorker worker is properly returning a snapshot measure
request, meaning it is successfully unserializing the message from the
queue."""
pull_return = mock.MagicMock()
received_message = mock.MagicMock()
received_message.ack_id = 0
Expand Down

0 comments on commit 67f99a1

Please sign in to comment.