Skip to content

Commit

Permalink
Implement more flows ops methods part 3 (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
emil-bar authored Dec 31, 2024
1 parent 37c1c22 commit c7fb12d
Show file tree
Hide file tree
Showing 6 changed files with 1,731 additions and 8 deletions.
291 changes: 291 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand All @@ -31,10 +32,12 @@
import java.util.function.Supplier;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.ChannelClosedException;
import com.softwaremill.jox.ChannelDone;
import com.softwaremill.jox.ChannelError;
import com.softwaremill.jox.Sink;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.CancellableFork;
import com.softwaremill.jox.structured.Fork;
import com.softwaremill.jox.structured.Scopes;
import com.softwaremill.jox.structured.UnsupervisedScope;
Expand Down Expand Up @@ -268,6 +271,138 @@ public <U> Flow<U> map(Function<T, U> mappingFunction) {
});
}


/**
* Functional interface used for {@link Flow#mapStateful} and {@link Flow#mapStatefulConcat}.
*
* @param <T> type of input flow elements
* @param <S> type of state
* @param <U> type of output flow
*/
@FunctionalInterface
public interface StatefulMapper<T, S, U> {
/**
* @param state current state
* @param element current input flow element
* @return pair of new state and `element` mapped to new type `U`
*/
Map.Entry<S, U> apply(S state, T element);
}

/**
* Functional interface used for {@link Flow#mapStateful(Supplier, StatefulMapper, OnComplete)} and {@link Flow#mapStatefulConcat(Supplier, StatefulMapper, OnComplete)}
* @param <S> State Type
* @param <U> Output Flow Type
*/
@FunctionalInterface
public interface OnComplete<S, U> {
/**
* @param state at the end of the flow
* @return {@link Optional#empty()} if value should be skipped, or value wrapped in {@link Optional}
*/
Optional<U> apply(S state);
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The results are emitted by the
* returned flow. Optionally the returned flow emits an additional element, possibly based on the final state, once this flow is done.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* The `onComplete` function is called once when this flow is done. If it returns a non-empty {@link Optional}, the value will be emitted by the
* flow, while an empty value will be ignored.
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and the result which is emitted by
* the returned flow.
* @param onComplete
* A function that transforms the final state into an optional element emitted by the returned flow.
*/
public <S, U> Flow<U> mapStateful(Supplier<S> initializeState, StatefulMapper<T, S, U> f, OnComplete<S, U> onComplete) {
StatefulMapper<T, S, Iterable<U>> resultToSome = (state, element) -> {
var result = f.apply(state, element);
return Map.entry(result.getKey(), List.of(result.getValue()));
};

return mapStatefulConcat(initializeState, resultToSome, onComplete);
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The results are emitted by the
* returned flow.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* If you want to send additional element after the flow is done, use {@link Flow#mapStateful(Supplier, StatefulMapper, OnComplete)}
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and the result which is emitted by
* the returned flow.
*/
public <S, U> Flow<U> mapStateful(Supplier<S> initializeState, StatefulMapper<T, S, U> f) {
return mapStateful(initializeState, f, _ -> Optional.empty());
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The returned flow emits the
* results one by one. Optionally the returned flow emits an additional element, possibly based on the final state, once this flow is
* done.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* The `onComplete` function is called once when this flow is done. If it returns a non-empty value, the value will be emitted by the
* returned flow, while an empty value will be ignored.
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and a
* {@code Iterable} of results which are emitted one by one by the returned flow. If the result of `f` is empty,
* nothing is emitted by the returned flow.
* @param onComplete
* A function that transforms the final state into an optional element emitted by the returned flow.
*/
public <S, U> Flow<U> mapStatefulConcat(Supplier<S> initializeState, StatefulMapper<T, S, Iterable<U>> f, OnComplete<S, U> onComplete) {
AtomicReference<S> state = new AtomicReference<>(initializeState.get());
return Flows.usingEmit(emit -> {
last.run(t -> {
Map.Entry<S, Iterable<U>> result = f.apply(state.get(), t);
for (U u : result.getValue()) {
emit.apply(u);
}
state.set(result.getKey());
});

Optional<U> onCompleteResult = onComplete.apply(state.get());
if (onCompleteResult.isPresent()) {
emit.apply(onCompleteResult.get());
}
});
}

/**
* Applies the given mapping function `f`, using additional state, to each element emitted by this flow. The returned flow emits the
* results one by one.
* <p>
* The `initializeState` function is called once when `statefulMap` is called.
* <p>
* If you want to send additional element after the flow is done, use {@link Flow#mapStatefulConcat(Supplier, StatefulMapper, OnComplete)}.
*
* @param initializeState
* A function that initializes the state.
* @param f
* A function that transforms the element from this flow and the state into a pair of the next state and a
* {@code Iterable} of results which are emitted one by one by the returned flow. If the result of `f` is empty,
* nothing is emitted by the returned flow.
*/
public <S, U> Flow<U> mapStatefulConcat(Supplier<S> initializeState, StatefulMapper<T, S, Iterable<U>> f) {
return mapStatefulConcat(initializeState, f, _ -> Optional.empty());
}

/**
* Emits only those elements emitted by this flow, for which `filteringPredicate` returns `true`.
*/
Expand Down Expand Up @@ -339,6 +474,162 @@ public Flow<T> take(int n) {
});
}

/** Groups elements emitted by this flow into child flows. Elements for which `groupingFunction` returns the same value (of type `V`) end up in
* the same child flow. `childFlowTransform` is applied to each created child flow, and the resulting flow is run in the background.
* Finally, the child flows are merged back, that is any elements that they emit are emitted by the returned flow.
* <p>
* Up to `parallelism` child flows are run concurrently in the background. When the limit is reached, the child flow which didn't
* receive a new element the longest is completed as done.
* <p>
* Child flows for `V` values might be created multiple times (if, after completing a child flow because of parallelism limit, new
* elements arrive, mapped to a given `V` value). However, it is guaranteed that for a given `V` value, there will be at most one child
* flow running at any time.
* <p>
* Child flows should only complete as done when the flow of received `T` elements completes. Otherwise, the entire stream will fail with
* an error.
* <p>
* Errors that occur in this flow, or in any child flows, become errors of the returned flow (exceptions are wrapped in
* {@link ChannelClosedException}.
* <p>
* The size of the buffers for the elements emitted by this flow (which is also run in the background) and the child flows are determined
* by the {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param parallelism
* An upper bound on the number of child flows that run in parallel at any time.
* @param groupingFunction
* Function used to determine the group for an element of type `T`. Each group is represented by a value of type `V`.
* @param childFlowTransform
* The function that is used to create a child flow, which is later in the background. The arguments are the group value, for which the
* flow is created, and a flow of `T` elements in that group (each such element has the same group value `V` returned by `predicated`).
*/
public <V, U> Flow<U> groupBy(int parallelism, Function<T, V> groupingFunction, ChildFlowTransformer<T, V, U> childFlowTransform) {
return new GroupByImpl<>(this, parallelism, groupingFunction, childFlowTransform)
.run();
}

/**
* Functional interface used in {@link Flow#groupBy} for transforming the child flows.
*/
@FunctionalInterface
public interface ChildFlowTransformer<T, V, U> {
Function<Flow<T>, Flow<U>> apply(V group);
}

/**
* Chunks up the emitted elements into groups, within a time window, or limited by the specified number of elements, whatever happens
* first. The timeout is reset after a group is emitted. If timeout expires and the buffer is empty, nothing is emitted. As soon as a new
* element is emitted, the flow will emit it as a single-element group and reset the timer.
* <p>
* The size of buffers used by this method is determined by {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param n
* The maximum number of elements in a group.
* @param duration
* The time window in which the elements are grouped.
*/
public Flow<List<T>> groupedWithin(int n, Duration duration) {
return groupedWeightedWithin(n, duration, _ -> 1L);
}

/**
* Chunks up the emitted elements into groups, within a time window, or limited by the cumulative weight being greater or equal to the
* `minWeight`, whatever happens first. The timeout is reset after a group is emitted. If timeout expires and the buffer is empty,
* nothing is emitted. As soon as a new element is received, the flow will emit it as a single-element group and reset the timer.
* <p>
* The size of buffer used by this method is determined by {@link Channel#BUFFER_SIZE} that is in scope, or default {@link Channel#DEFAULT_BUFFER_SIZE} is used.
*
* @param minWeight
* The minimum cumulative weight of elements in a group if no timeout happens.
* @param duration
* The time window in which the elements are grouped.
* @param costFn
* The function that calculates the weight of an element.
*/
@SuppressWarnings("unchecked")
public Flow<List<T>> groupedWeightedWithin(long minWeight, Duration duration, Function<T, Long> costFn) {
if (minWeight <= 0) throw new IllegalArgumentException("requirement failed: minWeight must be > 0");
if (duration.toMillis() <= 0) throw new IllegalArgumentException("requirement failed: duration must be > 0");

return Flows.usingEmit(emit -> {
Scopes.unsupervised(scope -> {
Source<T> flowSource = runToChannel(scope);
Channel<List<T>> outputChannel = Channel.withScopedBufferSize();
Channel<GroupingTimeout> timerChannel = Channel.withScopedBufferSize();

forkPropagate(scope, outputChannel, () -> {
List<T> buffer = new ArrayList<>();
final AtomicLong accumulatedCost = new AtomicLong(0);

CancellableFork<GroupingTimeout> timeoutFork = forkTimeout(scope, timerChannel, duration);

Callable<CancellableFork<Void>> sendBufferAndCleanupCost = () -> {
outputChannel.send(new ArrayList<>(buffer));
buffer.clear();
accumulatedCost.set(0);
return null;
};

boolean shouldRun = true;
while (shouldRun) {
shouldRun = switch (selectOrClosed(flowSource.receiveClause(), timerChannel.receiveClause())) {
case ChannelDone _:
// source is done, emit the buffer and finish
if (timeoutFork != null) timeoutFork.cancelNow();
if (!buffer.isEmpty()) outputChannel.send(buffer);
outputChannel.done();
yield false;
case ChannelError(Throwable cause):
// source returned error, propagate it and finish
if (timeoutFork != null) timeoutFork.cancelNow();
outputChannel.error(cause);
yield false;
case GroupingTimeout _:
timeoutFork = null; // enter 'timed out state', may stay in this state if buffer is empty
if (!buffer.isEmpty()) {
sendBufferAndCleanupCost.call();
// cancel existing timeout and start a new one
if (timeoutFork != null) timeoutFork.cancelNow();
timeoutFork = forkTimeout(scope, timerChannel, duration);
}
yield true;
case Object t:
buffer.add((T) t);
try {
long cost = accumulatedCost.updateAndGet(v -> v + costFn.apply((T) t));
if (timeoutFork == null || cost >= minWeight) {
// timeout passed when buffer was empty or buffer full
sendBufferAndCleanupCost.call();
// cancel existing timeout and start a new one
if (timeoutFork != null) timeoutFork.cancelNow();
timeoutFork = forkTimeout(scope, timerChannel, duration);
}
yield true;
} catch (Exception e) {
if (timeoutFork != null) timeoutFork.cancelNow();
throw e;
}
};
}
return null;
});
FlowEmit.channelToEmit(outputChannel, emit);
return null;
});
});
}

private CancellableFork<GroupingTimeout> forkTimeout(UnsupervisedScope scope, Channel<GroupingTimeout> timerChannel, Duration duration) {
return scope.forkCancellable(() -> {
Thread.sleep(duration);
timerChannel.sendOrClosed(GroupingTimeout.INSTANCE);
return null;
});
}

private enum GroupingTimeout {
INSTANCE
}

/**
* Chunks up the elements into groups of the specified size. The last group may be smaller due to the flow being complete.
*
Expand Down
Loading

0 comments on commit c7fb12d

Please sign in to comment.