Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;

/** Facade for a {@link List<T>} that keeps track of weight, for cache limit reasons. */
public class WeightedList<T> implements Weighted {
Expand Down Expand Up @@ -71,14 +72,6 @@ public void addAll(List<T> values, long weight) {
}

public void accumulateWeight(long weight) {
this.weight.accumulateAndGet(
weight,
(first, second) -> {
try {
return Math.addExact(first, second);
} catch (ArithmeticException e) {
return Long.MAX_VALUE;
}
});
this.weight.accumulateAndGet(weight, LongMath::saturatedAdd);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,16 @@ public void log(LogEntry entry) {
@TearDown
public void tearDown() {
try {
// Shutting down the control server should terminate the sdk client.
// We do this before shutting down logging server in particular as that can
// trigger exceptions if the client was not yet shutdown.
controlServer.close();
sdkHarnessExecutorFuture.get();

stateServer.close();
dataServer.close();
loggingServer.close();
controlClient.close();
sdkHarnessExecutorFuture.get();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.fn.harness;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
Expand All @@ -25,6 +27,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.Cache.Shrinkable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
Expand All @@ -41,7 +44,6 @@
import org.slf4j.LoggerFactory;

/** Utility methods used to instantiate and operate over cache instances. */
@SuppressWarnings("nullness")
public final class Caches {
private static final Logger LOG = LoggerFactory.getLogger(Caches.class);

Expand Down Expand Up @@ -70,7 +72,7 @@ public final class Caches {
public static final long REFERENCE_SIZE = 8;

/** Returns the amount of memory in bytes the provided object consumes. */
public static long weigh(Object o) {
public static long weigh(@Nullable Object o) {
if (o == null) {
return REFERENCE_SIZE;
}
Expand Down Expand Up @@ -115,6 +117,7 @@ static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedV
cache;
private final LongAdder weightInBytes;

@SuppressWarnings("argument")
ShrinkOnEviction(
CacheBuilder<CompositeKey, WeightedValue<Object>> cacheBuilder, LongAdder weightInBytes) {
this.cache = cacheBuilder.removalListener(this).build();
Expand All @@ -130,18 +133,19 @@ static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedV
@Override
public void onRemoval(
RemovalNotification<CompositeKey, WeightedValue<Object>> removalNotification) {
weightInBytes.add(
-(removalNotification.getKey().getWeight() + removalNotification.getValue().getWeight()));
if (removalNotification.wasEvicted()) {
if (!(removalNotification.getValue().getValue() instanceof Cache.Shrinkable)) {
return;
}
Object updatedEntry = ((Shrinkable<?>) removalNotification.getValue().getValue()).shrink();
if (updatedEntry != null) {
cache.put(
removalNotification.getKey(),
addWeightedValue(removalNotification.getKey(), updatedEntry, weightInBytes));
}
CompositeKey key = checkNotNull(removalNotification.getKey());
WeightedValue<Object> value = checkNotNull(removalNotification.getValue());
weightInBytes.add(-(key.getWeight() + value.getWeight()));
if (!removalNotification.wasEvicted()) {
return;
}
@Nullable Object v = value.getValue();
if (!(v instanceof Cache.Shrinkable)) {
return;
}
@Nullable Object updatedEntry = ((Shrinkable<?>) v).shrink();
if (updatedEntry != null) {
cache.put(key, addWeightedValue(key, updatedEntry, weightInBytes));
}
}
}
Expand Down Expand Up @@ -282,8 +286,8 @@ private static class SubCache<K, V> implements Cache<K, V> {
}

@Override
public V peek(K key) {
WeightedValue<Object> value = cache.getIfPresent(keyPrefix.valueKey(key));
public @Nullable V peek(K key) {
@Nullable WeightedValue<Object> value = cache.getIfPresent(keyPrefix.valueKey(key));
if (value == null) {
return null;
}
Expand All @@ -298,7 +302,9 @@ public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
cache
.get(
compositeKey,
() -> addWeightedValue(compositeKey, loadingFunction.apply(key), weightInBytes))
() ->
addWeightedValue(
compositeKey, checkNotNull(loadingFunction.apply(key)), weightInBytes))
.getValue();
} catch (ExecutionException e) {
throw new RuntimeException(e);
Expand All @@ -308,7 +314,7 @@ public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
@Override
public void put(K key, V value) {
CompositeKey compositeKey = keyPrefix.valueKey(key);
cache.put(compositeKey, addWeightedValue(compositeKey, value, weightInBytes));
cache.put(compositeKey, addWeightedValue(compositeKey, checkNotNull(value), weightInBytes));
}

@Override
Expand Down Expand Up @@ -356,7 +362,7 @@ CompositeKeyPrefix subKey(Object suffix, Object... additionalSuffixes) {
return new CompositeKeyPrefix(subKey, subKeyWeight);
}

<K> CompositeKey valueKey(K k) {
<K> CompositeKey valueKey(@Nullable K k) {
return new CompositeKey(namespace, weight, k);
}

Expand Down Expand Up @@ -391,22 +397,26 @@ boolean isEquivalentNamespace(CompositeKey otherKey) {
@VisibleForTesting
static class CompositeKey implements Weighted {
private final Object[] namespace;
private final Object key;
private final @Nullable Object key;
private final long weight;

private CompositeKey(Object[] namespace, long namespaceWeight, Object key) {
private CompositeKey(Object[] namespace, long namespaceWeight, @Nullable Object key) {
this.namespace = namespace;
this.key = key;
this.weight = namespaceWeight + weigh(key);
}

@Override
public String toString() {
return "CompositeKey{namespace=" + Arrays.toString(namespace) + ", key=" + key + "}";
return "CompositeKey{namespace="
+ Arrays.toString(namespace)
+ ", key="
+ String.valueOf(key)
+ "}";
}

@Override
public boolean equals(Object o) {
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
Expand Down Expand Up @@ -434,6 +444,7 @@ public long getWeight() {
* <p>The set of keys that are tracked are only those provided to {@link #peek} and {@link
* #computeIfAbsent}.
*/
@SuppressWarnings("nullness")
public static class ClearableCache<K, V> extends SubCache<K, V> {
private final Set<K> weakHashSet;

Expand Down
Loading
Loading