From f51ec01c1f10728a8b1b0922216700c18e1aea7c Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Mon, 29 Aug 2022 11:07:31 -0700 Subject: [PATCH] Refactor update requests map logic (#1160) --- .../opencensus/ext/azure/common/transport.py | 115 +++++++++--------- .../ext/azure/statsbeat/statsbeat_metrics.py | 2 +- 2 files changed, 60 insertions(+), 57 deletions(-) diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py index 8ad5e3ecd..2392f5df7 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py @@ -31,6 +31,7 @@ logger = logging.getLogger(__name__) + _MAX_CONSECUTIVE_REDIRECTS = 10 _MONITOR_OAUTH_SCOPE = "https://monitor.azure.com//.default" _requests_lock = threading.Lock() @@ -38,16 +39,20 @@ _REACHED_INGESTION_STATUS_CODES = (200, 206, 402, 408, 429, 439, 500) REDIRECT_STATUS_CODES = (307, 308) RETRYABLE_STATUS_CODES = ( + 206, # Partial success 401, # Unauthorized 403, # Forbidden 408, # Request Timeout - 429, # Too many requests + 429, # Too Many Requests - retry after 500, # Internal server error 502, # Bad Gateway 503, # Service unavailable 504, # Gateway timeout ) -THROTTLE_STATUS_CODES = (402, 439) +THROTTLE_STATUS_CODES = ( + 402, # Quota, too Many Requests over extended time + 439, # Quota, too Many Requests over extended time (legacy) +) class TransportStatusCode: @@ -190,6 +195,46 @@ def _transmit(self, envelopes): if self._check_stats_collection(): _update_requests_map('success') return TransportStatusCode.SUCCESS + elif status_code == 206: # Partial Content + data = None + try: + data = json.loads(text) + except Exception as ex: + if not self._is_stats_exporter(): + logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501 + if self._check_stats_collection(): + _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 + return TransportStatusCode.DROP + if data: + try: + resend_envelopes = [] + for error in data['errors']: + if _status_code_is_retryable(error['statusCode']): + resend_envelopes.append(envelopes[error['index']]) + if self._check_stats_collection(): + _update_requests_map('retry', value=error['statusCode']) # noqa: E501 + else: + if not self._is_stats_exporter(): + logger.error( + 'Data drop %s: %s %s.', + error['statusCode'], + error['message'], + envelopes[error['index']], + ) + if self.storage and resend_envelopes: + self.storage.put(resend_envelopes) + except Exception as ex: + if not self._is_stats_exporter(): + logger.error( + 'Error while processing %s: %s %s.', + status_code, + text, + ex, + ) + if self._check_stats_collection(): + _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 + return TransportStatusCode.DROP + # cannot parse response body, fallback to retry elif _status_code_is_redirect(status_code): # Redirect # for statsbeat, these are not tracked as success nor failures self._consecutive_redirects += 1 @@ -255,45 +300,6 @@ def _transmit(self, envelopes): if self._check_stats_collection(): _update_requests_map('retry', value=status_code) return TransportStatusCode.RETRY - elif status_code == 206: # Partial Content - data = None - try: - data = json.loads(text) - except Exception as ex: - if not self._is_stats_exporter(): - logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501 - if self._check_stats_collection(): - _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 - return TransportStatusCode.DROP - if data: - try: - resend_envelopes = [] - for error in data['errors']: - if _status_code_is_retryable(error['statusCode']): - resend_envelopes.append(envelopes[error['index']]) - if self._check_stats_collection(): - _update_requests_map('retry', value=error['statusCode']) # noqa: E501 - else: - logger.error( - 'Data drop %s: %s %s.', - error['statusCode'], - error['message'], - envelopes[error['index']], - ) - if self.storage and resend_envelopes: - self.storage.put(resend_envelopes) - except Exception as ex: - if not self._is_stats_exporter(): - logger.error( - 'Error while processing %s: %s %s.', - status_code, - text, - ex, - ) - if self._check_stats_collection(): - _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 - return TransportStatusCode.DROP - # cannot parse response body, fallback to retry else: # 400 and 404 will be tracked as failure count # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey) # noqa: E501 @@ -332,21 +338,18 @@ def _statsbeat_failure_reached_threshold(): return state.get_statsbeat_initial_failure_count() >= 3 -def _update_requests_map(type, value=None): - if value is None: - value = 0 +def _update_requests_map(type_name, value=None): + # value is either None, duration, status_code or exc_name with _requests_lock: - if type == "count": - _requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501 - elif type == "duration": # value will be duration - _requests_map['duration'] = _requests_map.get('duration', 0) + value # noqa: E501 - elif type == "success": - _requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501 - else: - # value will be a key (status_code/error message) + if type_name == "success" or type_name == "count": # success, count + _requests_map[type_name] = _requests_map.get(type_name, 0) + 1 + elif type_name == "duration": # value will be duration + _requests_map[type_name] = _requests_map.get(type_name, 0) + value # noqa: E501 + else: # exception, failure, retry, throttle + # value will be a key (status_code/exc_name) prev = 0 - if _requests_map.get(type): - prev = _requests_map.get(type).get(value, 0) + if _requests_map.get(type_name): + prev = _requests_map.get(type_name).get(value, 0) else: - _requests_map[type] = {} - _requests_map[type][value] = prev + 1 + _requests_map[type_name] = {} + _requests_map[type_name][value] = prev + 1 diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/statsbeat/statsbeat_metrics.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/statsbeat/statsbeat_metrics.py index d025bbbf8..c71c56e1c 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/statsbeat/statsbeat_metrics.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/statsbeat/statsbeat_metrics.py @@ -270,7 +270,7 @@ def __init__(self, options): self._network_metrics[_get_average_duration_value] = DerivedDoubleGauge( # noqa: E501 _REQ_DURATION_NAME, 'Statsbeat metric tracking average request duration', - 'count', + 'avg', _get_network_properties(value="Duration"), ) self._network_metrics[_get_retry_count_value] = DerivedLongGauge(