Skip to content

Commit

Permalink
Merge "[exporter] Run API checks in dedicated threads"
Browse files Browse the repository at this point in the history
  • Loading branch information
MOS CI authored and Gerrit Code Review committed Jan 9, 2025
2 parents c7eaaa2 + a6195ed commit a9d4b59
Showing 1 changed file with 72 additions and 37 deletions.
109 changes: 72 additions & 37 deletions rockoon/exporter/collectors/openstack/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.

from concurrent.futures import ThreadPoolExecutor, ALL_COMPLETED, wait
import requests
from urllib3.exceptions import InsecureRequestWarning

Expand All @@ -25,6 +26,33 @@
LOG = utils.get_logger(__name__)


def check_endpoint(url, service_type, service_name, headers):
result = {"success": True}
# TODO(vsaienko): mount ssl ca_cert from osdpl and use here.
try:
requests.packages.urllib3.disable_warnings(
category=InsecureRequestWarning
)
resp = requests.get(url, timeout=10, verify=False, headers=headers)
if resp.status_code >= 500:
LOG.warning(
f"Got bad responce code {resp.status_code} from {url}."
)
result["success"] = False
result["status"] = (
[url, service_type, service_name],
resp.status_code,
)
result["latency"] = (
[url, service_type, service_name],
resp.elapsed.microseconds,
)
except Exception as e:
LOG.warning(f"Failed to get responce from {url}. Error: {e}")
result["success"] = False
return result


class OsdplApiMetricCollector(base.OpenStackBaseMetricCollector):
_name = "osdpl_api"
_description = "OpenStack API endpoints"
Expand Down Expand Up @@ -54,47 +82,54 @@ def update_samples(self):
statuses = []
latencies = []
successes = []
for endpoint in self.oc.oc.identity.endpoints(interface="public"):
service = self.oc.oc.identity.get_service(endpoint.service_id)
service_type = self.oc.service_type_manager.get_service_type(
service.type
)
service_name = service["name"]
if not service_type:
LOG.warning(
f"Failed to get service_type for service {service}"
)
continue
url = endpoint["url"].split("%")[0]
success = True
token = self.oc.oc.auth_token
headers = {"X-Auth-Token": token}
try:
# TODO(vsaienko): mount ssl ca_cert from osdpl and use here.
requests.packages.urllib3.disable_warnings(
category=InsecureRequestWarning
endpoints = list(self.oc.oc.identity.endpoints(interface="public"))
max_workers = min(20, len(endpoints))
future_results = {}

with ThreadPoolExecutor(max_workers=max_workers) as executor:
for endpoint in endpoints:
service = self.oc.oc.identity.get_service(endpoint.service_id)
service_type = self.oc.service_type_manager.get_service_type(
service.type
)
resp = requests.get(
url, timeout=30, verify=False, headers=headers
service_name = service["name"]
if not service_type:
LOG.warning(
f"Failed to get service_type for service {service}"
)
continue
url = endpoint["url"].split("%")[0]
token = self.oc.oc.auth_token
headers = {"X-Auth-Token": token}
future = executor.submit(
check_endpoint,
url=url,
service_type=service_type,
service_name=service_name,
headers=headers,
)
statuses.append(
([url, service_type, service_name], resp.status_code)
future_results[(url, service_type, service_name)] = future
done, not_done = wait(
future_results.values(),
return_when=ALL_COMPLETED,
timeout=30,
)
for endpoint_data, future in future_results.items():
url, service_type, service_name = endpoint_data
if endpoint_data in not_done:
successes.append(
([url, service_type, service_name], int(False))
)
latencies.append(
(
[url, service_type, service_name],
resp.elapsed.microseconds,
)
else:
result = future.result()
successes.append(
([url, service_type, service_name], int(result["success"]))
)
if resp.status_code >= 500:
LOG.warning(
f"Got bad responce code {resp.status_code} from {url}."
)
success = False
except Exception as e:
LOG.warning(f"Failed to get responce from {url}. Error: {e}")
success = False
successes.append(([url, service_type, service_name], int(success)))
if "status" in result:
statuses.append(result["status"])
if "latency" in result:
latencies.append(result["latency"])

self.set_samples("status", statuses)
self.set_samples("latency", latencies)
self.set_samples("success", successes)

0 comments on commit a9d4b59

Please sign in to comment.