Skip to content

Commit

Permalink
Implement exceptionType and statusCode for network statsbeat (#1138)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Jul 14, 2022
1 parent 00a817c commit de44767
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 123 deletions.
3 changes: 3 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Add statusCode and exceptionType to network statsbeat
([#1138](https://github.com/census-instrumentation/opencensus-python/pull/1138))

## 1.1.5
Released 2022-07-05

Expand Down
105 changes: 63 additions & 42 deletions contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@
_requests_lock = threading.Lock()
_requests_map = {}
_REACHED_INGESTION_STATUS_CODES = (200, 206, 402, 408, 429, 439, 500)
REDIRECT_STATUS_CODES = (307, 308)
RETRYABLE_STATUS_CODES = (
401, # Unauthorized
403, # Forbidden
408, # Request Timeout
429, # Too many requests
500, # Internal server error
503, # Service unavailable
)
THROTTLE_STATUS_CODES = (402, 439)


class TransportStatusCode:
Expand Down Expand Up @@ -82,6 +92,7 @@ def _transmit(self, envelopes):
if not envelopes:
return 0
status = None
exception = None
try:
start_time = time.time()
headers = {
Expand All @@ -93,9 +104,6 @@ def _transmit(self, envelopes):
token = self.options.credential.get_token(_MONITOR_OAUTH_SCOPE)
headers["Authorization"] = "Bearer {}".format(token.token)
endpoint += '/v2.1/track'
if self._check_stats_collection():
with _requests_lock:
_requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501
response = requests.post(
url=endpoint,
data=json.dumps(envelopes),
Expand All @@ -104,52 +112,52 @@ def _transmit(self, envelopes):
proxies=json.loads(self.options.proxies),
allow_redirects=False,
)
except requests.Timeout:
except requests.Timeout as ex:
if not self._is_stats_exporter():
logger.warning(
'Request time out. Ingestion may be backed up. Retrying.')
status = TransportStatusCode.RETRY
exception = ex
except requests.RequestException as ex:
if not self._is_stats_exporter():
logger.warning(
'Retrying due to transient client side error %s.', ex)
# client side error (retryable)
status = TransportStatusCode.RETRY
exception = ex
except CredentialUnavailableError as ex:
if not self._is_stats_exporter():
logger.warning('Credential error. %s. Dropping telemetry.', ex)
status = TransportStatusCode.DROP
exception = ex
except ClientAuthenticationError as ex:
if not self._is_stats_exporter():
logger.warning('Authentication error %s', ex)
status = TransportStatusCode.RETRY
exception = ex
except Exception as ex:
if not self._is_stats_exporter():
logger.warning(
'Error when sending request %s. Dropping telemetry.', ex)
# Extraneous error (non-retryable)
status = TransportStatusCode.DROP
exception = ex
finally:
if self._check_stats_collection():
_update_requests_map('count')
end_time = time.time()
if self._check_stats_collection():
with _requests_lock:
duration = _requests_map.get('duration', 0)
_requests_map['duration'] = duration + (end_time - start_time) # noqa: E501
if status is not None:
_update_requests_map('duration', value=end_time-start_time)

if status is not None and exception is not None:
if self._check_stats_collection():
with _requests_lock:
if status is TransportStatusCode.RETRY:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
else:
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
_update_requests_map('exception', value=exception.__class__.__name__) # noqa: E501
return status
if self._is_stats_exporter() and \
not state.get_statsbeat_shutdown() and \
not state.get_statsbeat_initial_success():
# If ingestion threshold during statsbeat initialization is
# reached, return back code to shut it down
if _statsbeat_failure_reached_threshold():
return TransportStatusCode.STATSBEAT_SHUTDOWN
return status

text = 'N/A'
status_code = 0
Expand All @@ -160,7 +168,7 @@ def _transmit(self, envelopes):
if not self._is_stats_exporter():
logger.warning('Error while reading response body %s.', ex)
if self._check_stats_collection():
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
_update_requests_map('exception', value=ex.__class__.__name__)
return TransportStatusCode.DROP

if self._is_stats_exporter() and \
Expand All @@ -178,8 +186,7 @@ def _transmit(self, envelopes):
if status_code == 200: # Success
self._consecutive_redirects = 0
if self._check_stats_collection():
with _requests_lock:
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
_update_requests_map('success')
return TransportStatusCode.SUCCESS
elif _status_code_is_redirect(status_code): # Redirect
# for statsbeat, these are not tracked as success nor failures
Expand All @@ -206,21 +213,25 @@ def _transmit(self, envelopes):
)
# If redirect but did not return, exception occured
if self._check_stats_collection():
with _requests_lock:
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
_update_requests_map('exception', value="Circular Redirect")
return TransportStatusCode.DROP
elif _status_code_is_throttle(status_code): # Throttle
if self._check_stats_collection():
# 402: Monthly Quota Exceeded (new SDK)
# 439: Monthly Quota Exceeded (old SDK) <- Currently OC SDK
with _requests_lock:
_requests_map['throttle'] = _requests_map.get('throttle', 0) + 1 # noqa: E501
_update_requests_map('throttle', value=status_code)
if not self._is_stats_exporter():
logger.warning(
'Telemetry was throttled %s: %s.',
status_code,
text,
)
return TransportStatusCode.DROP
elif _status_code_is_retryable(status_code): # Retry
if not self._is_stats_exporter():
if status_code == 401: # Authentication error
logger.warning(
'Authentication error %s: %s.',
'Authentication error %s: %s. Retrying.',
status_code,
text,
)
Expand All @@ -229,7 +240,7 @@ def _transmit(self, envelopes):
# Can occur when v2 endpoint is used while AI resource is configured # noqa: E501
# with disableLocalAuth
logger.warning(
'Forbidden error %s: %s.',
'Forbidden error %s: %s. Retrying.',
status_code,
text,
)
Expand All @@ -240,8 +251,7 @@ def _transmit(self, envelopes):
text,
)
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
_update_requests_map('retry', value=status_code)
return TransportStatusCode.RETRY
elif status_code == 206: # Partial Content
data = None
Expand All @@ -251,7 +261,7 @@ def _transmit(self, envelopes):
if not self._is_stats_exporter():
logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501
if self._check_stats_collection():
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
_update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501
return TransportStatusCode.DROP
if data:
try:
Expand All @@ -260,8 +270,7 @@ def _transmit(self, envelopes):
if _status_code_is_retryable(error['statusCode']):
resend_envelopes.append(envelopes[error['index']])
if self._check_stats_collection():
with _requests_lock:
_requests_map['retry'] = _requests_map.get('retry', 0) + 1 # noqa: E501
_update_requests_map('retry', value=error['statusCode']) # noqa: E501
else:
logger.error(
'Data drop %s: %s %s.',
Expand All @@ -280,16 +289,15 @@ def _transmit(self, envelopes):
ex,
)
if self._check_stats_collection():
_requests_map['exception'] = _requests_map.get('exception', 0) + 1 # noqa: E501
_update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501
return TransportStatusCode.DROP
# cannot parse response body, fallback to retry
else:
# 400 and 404 will be tracked as failure count
# 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey) # noqa: E501
# 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string # noqa: E501
if self._check_stats_collection():
with _requests_lock:
_requests_map['failure'] = _requests_map.get('failure', 0) + 1 # noqa: E501
_update_requests_map('failure', value=status_code)
# Other, server side error (non-retryable)
if not self._is_stats_exporter():
logger.error(
Expand All @@ -301,21 +309,15 @@ def _transmit(self, envelopes):


def _status_code_is_redirect(status_code):
return status_code in (307, 308)
return status_code in REDIRECT_STATUS_CODES


def _status_code_is_throttle(status_code):
return status_code in (402, 439)
return status_code in THROTTLE_STATUS_CODES


def _status_code_is_retryable(status_code):
return status_code in (
401, # Unauthorized
403, # Forbidden
429, # Too many requests
500, # Internal server error
503, # Service unavailable
)
return status_code in RETRYABLE_STATUS_CODES


def _reached_ingestion_status_code(status_code):
Expand All @@ -326,3 +328,22 @@ def _statsbeat_failure_reached_threshold():
# increment failure counter for sending statsbeat if in initialization
state.increment_statsbeat_initial_failure_count()
return state.get_statsbeat_initial_failure_count() >= 3


def _update_requests_map(type, value=None):
if value is None:
value = 0 # error state
with _requests_lock:
if type == "count":
_requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501
elif type == "duration":
_requests_map['duration'] = _requests_map.get('duration', 0) + value # noqa: E501
elif type == "success":
_requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501
else:
prev = 0
if _requests_map.get(type):
prev = _requests_map.get(type).get(value, 0)
else:
_requests_map[type] = {}
_requests_map[type][value] = prev + 1
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def export_metrics(self, metrics):
batch = self.apply_telemetry_processors(batch)
result = self._transmit(batch)
# If statsbeat exporter and received signal to shutdown
if self._is_stats_exporter() and result is \
if self._is_stats and result is \
TransportStatusCode.STATSBEAT_SHUTDOWN:
from opencensus.ext.azure.statsbeat import statsbeat
statsbeat.shutdown_statsbeat_metrics()
Expand All @@ -102,6 +102,9 @@ def metric_to_envelopes(self, metric):
# point which contains the aggregated value
data_point = self._create_data_points(
time_series, md)[0]
# if statsbeat exporter, ignore points with 0 value
if self._is_stats and data_point.value == 0:
continue
# The timestamp is when the metric was recorded
timestamp = time_series.points[0].timestamp
# Get the properties using label keys from metric
Expand Down
Loading

0 comments on commit de44767

Please sign in to comment.