@@ -60,6 +60,7 @@ use sync_types::IdentityVersion;
60
60
mod metrics;
61
61
62
62
use metrics:: {
63
+ log_debug_sync_protocol_websockets_total,
63
64
log_sync_protocol_websockets_total,
64
65
log_websocket_client_timeout,
65
66
log_websocket_closed,
@@ -96,6 +97,25 @@ impl Drop for SyncSocketDropToken {
96
97
}
97
98
}
98
99
100
+ // TODO(presley): Remove. Used for debugging.
101
+ struct DebugSyncSocketDropToken {
102
+ tag : & ' static str ,
103
+ }
104
+
105
+ /// Tracker that exists for the lifetime of a run_sync_socket.
106
+ impl DebugSyncSocketDropToken {
107
+ fn new ( tag : & ' static str ) -> Self {
108
+ log_debug_sync_protocol_websockets_total ( tag, 1 ) ;
109
+ DebugSyncSocketDropToken { tag }
110
+ }
111
+ }
112
+
113
+ impl Drop for DebugSyncSocketDropToken {
114
+ fn drop ( & mut self ) {
115
+ log_debug_sync_protocol_websockets_total ( self . tag , -1 ) ;
116
+ }
117
+ }
118
+
99
119
// The WebSocket layer for the sync protocol has three asynchronous processes:
100
120
//
101
121
// 1) A `receive_messages` loop that consumes messages from the WebSocket,
@@ -124,6 +144,7 @@ async fn run_sync_socket(
124
144
125
145
let ( client_tx, client_rx) = mpsc:: unbounded ( ) ;
126
146
let receive_messages = async {
147
+ let _receive_message_drop_token = DebugSyncSocketDropToken :: new ( "receive_message" ) ;
127
148
while let Some ( message_r) = rx. next ( ) . await {
128
149
let message = match message_r {
129
150
Ok ( message) => message,
@@ -175,6 +196,7 @@ async fn run_sync_socket(
175
196
176
197
let ( server_tx, mut server_rx) = measurable_unbounded_channel ( ) ;
177
198
let send_messages = async {
199
+ let _send_message_drop_token = DebugSyncSocketDropToken :: new ( "send_message" ) ;
178
200
let mut ping_ticker = tokio:: time:: interval ( HEARTBEAT_INTERVAL ) ;
179
201
' top: loop {
180
202
select_biased ! {
@@ -209,6 +231,7 @@ async fn run_sync_socket(
209
231
} ;
210
232
let mut identity_version: Option < IdentityVersion > = None ;
211
233
let sync_worker_go = async {
234
+ let _sync_worker_drop_token = DebugSyncSocketDropToken :: new ( "sync_worker" ) ;
212
235
let mut sync_worker = SyncWorker :: new (
213
236
st. api . clone ( ) ,
214
237
st. runtime . clone ( ) ,
0 commit comments