@@ -72,10 +72,8 @@ public void onConnected(SessionConnectedEvent event) {
72
72
public void onSubscribe (SessionSubscribeEvent event ) {
73
73
// First let's extract all the necessary info about new watching from the subscribe request
74
74
StompHeaderAccessor headers = StompHeaderAccessor .wrap (event .getMessage ());
75
- List <String > rawHeaderValue = headers .getNativeHeader ("isPlain" );
76
- assert (rawHeaderValue != null ) && (rawHeaderValue .size () == 1 )
77
- : "'isPlain' header of SUBSCRIBE command is absent or malformed" ;
78
- Boolean isPlain = Boolean .valueOf (rawHeaderValue .get (0 ));
75
+ boolean isPlain = getBooleanNativeHeader (headers , "isPlain" );
76
+ boolean isTailNeeded = getBooleanNativeHeader (headers , "isTailNeeded" );
79
77
String sessionId = headers .getSessionId ();
80
78
String destination = headers .getDestination ();
81
79
assert (destination != null ) && destination .startsWith (WEBSOCKET_TOPIC_PREFIX )
@@ -98,7 +96,7 @@ public void onSubscribe(SessionSubscribeEvent event) {
98
96
// 1. Ensure that all RMI registration channels are created
99
97
ensureRegistrationChannelsCreated (logConfig );
100
98
// 2. Register the tracking on specified nodes
101
- switchTracking (logConfig , true );
99
+ startTrackingOnServer (logConfig , isTailNeeded );
102
100
// 3. Remember the tracking in the registry
103
101
registry .addEntry (logConfig , sessionId );
104
102
log .info ("New tracking for log '{}' has started within session id={}." , logConfig .getUid (), sessionId );
@@ -108,6 +106,9 @@ public void onSubscribe(SessionSubscribeEvent event) {
108
106
ServerFailure failure = new ServerFailure (e .getMessage (), now ());
109
107
messagingTemplate .convertAndSend (WEBSOCKET_TOPIC_PREFIX + logConfig .getUid (),
110
108
failure , singletonMap (MESSAGE_TYPE_HEADER , MessageType .FAILURE ));
109
+ // TODO Научиться по аналогии с этим отправлять сообщение об успешной настройке подписки, для чего превратить
110
+ // serverFailure в более общий тип сообщения. При получении этого типа убирать на клиенте прелоадер,
111
+ // выставленный перед отправкой запроса на подписку.
111
112
}
112
113
113
114
} else { // i.e. if there are watching sessions already in registry
@@ -141,11 +142,11 @@ private void stopTrackingIfNeeded(Message<byte[]> message, boolean isUnsubscribi
141
142
// check if there is no such session(s)
142
143
if (fellows == null ) {
143
144
if (isUnsubscribing ) { // in case of unsubscribing it is incorrect situation
144
- throw new IllegalStateException ("No registered session(s) found for sessionId=" + sessionId );
145
+ log . warn ("No registered session(s) found for sessionId={}" , sessionId );
145
146
} else { // but in case of disconnecting it is quite right
146
147
log .info ("No registered session found for sessionId={}." , sessionId );
147
- return ;
148
148
}
149
+ return ;
149
150
}
150
151
// check if there any other session(s) left (may require synchronization on registry object)
151
152
if (fellows .size () > 1 ) {
@@ -157,13 +158,30 @@ private void stopTrackingIfNeeded(Message<byte[]> message, boolean isUnsubscribi
157
158
// in case it was the latest session watching that log we should stop the tracking
158
159
LogConfigEntry watchingLog = registry .findLogConfigEntryBy (sessionId );
159
160
log .debug ("No sessions left watching log '{}'. Will deactivate the tracking..." , watchingLog .getUid ());
160
- switchTracking (watchingLog , false );
161
+ stopTrackingOnServer (watchingLog );
161
162
// now that the log is not tracked anymore we need to remove it from the registry
162
163
registry .removeEntry (watchingLog );
163
164
log .info ("Current node has unregistered itself from tracking log '{}' as there is no watching sessions anymore." ,
164
165
watchingLog .getUid ());
165
166
}
166
167
168
+ private boolean getBooleanNativeHeader (StompHeaderAccessor headers , String name ) {
169
+ List <String > rawHeaderValue = headers .getNativeHeader (name );
170
+ assert (rawHeaderValue != null ) && (rawHeaderValue .size () == 1 )
171
+ : format ("'%s' header of SUBSCRIBE command is absent or malformed" , name );
172
+ return Boolean .valueOf (rawHeaderValue .get (0 ));
173
+ }
174
+
175
+ /**
176
+ * When watching request comes for a plain log AnaLog does not tries to find corresponding log config entry.
177
+ * Instead it just creates new ('artificial') entry and then works with it only. This approach allows AnaLog to
178
+ * watch arbitrary plain logs independently of its configuration. Particularly, it means that a user can set any
179
+ * path into AnaLog's address line and start to watch it the same way as if it was pre-configured as a choice
180
+ * variant in AnaLog configuration.
181
+ *
182
+ * @param path full path of log file to watch for
183
+ * @return newly created log config entry for the specified path
184
+ */
167
185
@ NotNull
168
186
private LogConfigEntry createPlainLogConfigEntry (String path ) {
169
187
LogConfigEntry artificialEntry = new LogConfigEntry ();
@@ -206,13 +224,22 @@ private void ensureRegistrationChannelsCreated(LogConfigEntry matchingEntry) {
206
224
});
207
225
}
208
226
209
- private void switchTracking (LogConfigEntry logConfigEntry , boolean isOn ) {
227
+ private void stopTrackingOnServer (LogConfigEntry logConfigEntry ) {
228
+ switchTrackingOnServer (logConfigEntry , false , false );
229
+ }
230
+
231
+ private void startTrackingOnServer (LogConfigEntry logConfigEntry , boolean isTailNeeded ) {
232
+ switchTrackingOnServer (logConfigEntry , true , isTailNeeded );
233
+ }
234
+
235
+ private void switchTrackingOnServer (LogConfigEntry logConfigEntry , boolean isOn , boolean isTailNeeded ) {
236
+ assert !(isTailNeeded && !isOn ) : "isTailNeeded flag should not be raised when switching the tracking off" ;
210
237
ClusterNode myselfNode = clusterProperties .getMyselfNode ();
211
238
String fullPath = buildFullPath (logConfigEntry );
212
239
String nodeName = nvls (logConfigEntry .getNode (), myselfNode .getName ());
213
240
214
241
// send registration request for the main entry
215
- TrackingRequest primaryRequest = new TrackingRequest (fullPath , logConfigEntry .getTimestamp (), nodeName , logConfigEntry .getUid ());
242
+ TrackingRequest primaryRequest = new TrackingRequest (fullPath , logConfigEntry .getTimestamp (), nodeName , logConfigEntry .getUid (), isTailNeeded );
216
243
log .debug ("Switching {} the registration by PRIMARY request: {}" , isOn ? "ON" :"OFF" , primaryRequest );
217
244
remoteGateway .switchRegistration (primaryRequest , isOn );
218
245
@@ -224,7 +251,8 @@ private void switchTracking(LogConfigEntry logConfigEntry, boolean isOn) {
224
251
normalizePath (included .getPath ()),
225
252
included .getTimestamp (),
226
253
nvls (included .getNode (), myselfNode .getName ()),
227
- logConfigEntry .getUid ());
254
+ logConfigEntry .getUid (),
255
+ isTailNeeded );
228
256
log .debug ("Switching {} the registration by INCLUDED request: {}" , isOn ? "ON" :"OFF" , includedRequest );
229
257
remoteGateway .switchRegistration (includedRequest , isOn );
230
258
0 commit comments