Skip to content

Commit

Permalink
also notify all resources processed when failed to parse nonce
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Jan 29, 2025
1 parent 5cafe6c commit 53f71c2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
12 changes: 8 additions & 4 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,11 @@ private void handleResourceRemoval(Collection<String> removedResources, Resource
}
}

// If the nonce indicates that this is the last chunk of the response, notify the wildcard subscriber.
// Notify the wildcard subscriber for having processed all resources if either of these conditions met:
// 1) the nonce indicates that this is the last chunk of the response.
// 2) failed to parse a malformed or absent nonce.
// Details of the nonce format can be found here:
// https://github.com/linkedin/diderot/blob/b7418ea227eec45056a9de4deee2eb50387f63e8/ads/ads.go#L276
private void notifyOnLastChunk(DiscoveryResponseData response)
{
ResourceType type = response.getResourceType();
Expand All @@ -616,7 +620,7 @@ private void notifyOnLastChunk(DiscoveryResponseData response)
try
{
byte[] bytes = Hex.decodeHex(response.getNonce().toCharArray());
ByteBuffer bb = ByteBuffer.wrap(Arrays.copyOfRange(bytes, 8, 12));
ByteBuffer bb = ByteBuffer.wrap(bytes, 8, 4);
remainingChunks = bb.getInt();
}
catch (Exception e)
Expand All @@ -625,9 +629,9 @@ private void notifyOnLastChunk(DiscoveryResponseData response)
remainingChunks = -1;
}

if (remainingChunks == 0)
if (remainingChunks <= 0)
{
_log.info("Notifying wildcard subscriber of type {} for the end of response chunks.", type);
_log.debug("Notifying wildcard subscriber of type {} for the end of response chunks.", type);
wildcardResourceSubscriber.onAllResourcesProcessed();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public class XdsDirectory implements Directory
final AtomicBoolean _isUpdating = new AtomicBoolean(true);
/**
* This lock will be released when the service and cluster names data have been updated and is ready to serve.
* If the data is being updated, requests to read the data will wait indefinitely. Callers could set a shorter
* timeout on the callback passed in to getServiceNames or getClusterNames, as needed.
* If the data is being updated, requests to read the data will wait indefinitely. Callers should set a timeout when
* getting the result of the callback passed to getServiceNames or getClusterNames, as needed.
*/
private final Object _dataReadyLock = new Object();

Expand Down Expand Up @@ -144,6 +144,10 @@ private void waitAndRespond(boolean isForService, Callback<List<String>> callbac
{
synchronized (_dataReadyLock)
{
// Wait indefinitely if the data is being updated. Note that notifyAll randomly wake up threads that are waiting
// on this dataReadyLock. Sometimes the thread is woken up, but the _isUpdating is not false yet, so the thread
// need to go back to waiting.
// A timeout should be set by the caller when getting the result of the callback.
while (_isUpdating.get())
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import com.linkedin.common.callback.Callback;
import com.linkedin.d2.xds.XdsClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
Expand All @@ -21,9 +23,11 @@
public class TestXdsDirectory
{
/**
* Simulate getting cluster and service names with multiple threads.
* Simulate getting cluster and service names with multiple threads. Threads should be blocked until
* onAllResourcesProcessed is called. They should be re-blocked if new update comes in, and unblocked again when
* onAllResourcesProcessed is called.
*/
@Test(timeOut = 2000)
@Test(timeOut = 3000)
public void testGetClusterAndServiceNames() throws InterruptedException {
int numCallers = 20;
int halfCallers = numCallers / 2;
Expand Down Expand Up @@ -69,15 +73,24 @@ public void testGetClusterAndServiceNames() throws InterruptedException {
.filter(result -> Objects.equals(result, Collections.singletonList(SERVICE_NAME))).count();
Assert.assertEquals(serviceMatchCount, halfCallers);

// adding new resource will trigger updating again
// adding new resource will trigger updating again, caller threads should be re-blocked, and new data shouldn't be
// added to the results
watcher.onChanged(SERVICE_RESOURCE_NAME_2, SERVICE_NAME_DATA_UPDATE_2);
executor = Executors.newFixedThreadPool(1);
runCallers(fixture, executor, 1);
Assert.assertTrue(directory._isUpdating.get());
Assert.assertEquals(directory._serviceNames,
ImmutableMap.of(SERVICE_RESOURCE_NAME, SERVICE_NAME, SERVICE_RESOURCE_NAME_2, SERVICE_NAME_2));
Assert.assertTrue(fixture._results.stream().noneMatch(result ->
matchSortedLists(result, Arrays.asList(SERVICE_NAME, SERVICE_NAME_2))));

// finish updating again
// finish updating again, new data should be added to the results
watcher.onAllResourcesProcessed();
Assert.assertFalse(directory._isUpdating.get());
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(1000, java.util.concurrent.TimeUnit.MILLISECONDS));
Assert.assertEquals(1, fixture._results.stream()
.filter(result -> matchSortedLists(result, Arrays.asList(SERVICE_NAME, SERVICE_NAME_2))).count());
}

private void runCallers(XdsDirectoryFixture fixture, ExecutorService executor, int num)
Expand All @@ -88,6 +101,16 @@ private void runCallers(XdsDirectoryFixture fixture, ExecutorService executor, i
}
}

private boolean matchSortedLists(List<String> one, List<String> other)
{
if (one.size() != other.size())
{
return false;
}
return Objects.equals(one.stream().sorted().collect(Collectors.toList()),
other.stream().sorted().collect(Collectors.toList()));
}

private static final class XdsDirectoryFixture
{
XdsDirectory _xdsDirectory;
Expand Down

0 comments on commit 53f71c2

Please sign in to comment.