Skip to content

Commit

Permalink
Update partitionByKey + add toKeySet docs (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
voliva authored Sep 27, 2023
1 parent b71cef5 commit b89455a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 10 deletions.
7 changes: 3 additions & 4 deletions docs/api/utils/combineKeys.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ of each key.

```ts
export const combineKeys = <K, T>(
keys$: Observable<Array<K> | Set<K>>,
keys$: Observable<Iterable<K> | KeyChanges<K>>,
getInner$: (key: K) => Observable<T>,
): Observable<Map<K, T>>
```
Expand Down Expand Up @@ -53,8 +53,8 @@ const petRace$ = petUpdate$.pipe(startWith(
...petNames.map((pet, id): Pet => ({pet, id, pos: 1})),
));

const [petByID, petIds$] = partitionByKey(petRace$, x => x.id)
const keyMap$ = combineKeys(petIds$, petByID);
const [petByID, petIdChange$] = partitionByKey(petRace$, x => x.id)
const keyMap$ = combineKeys(petIdChange$, petByID);

const leadingPet$ = keyMap$.pipe(map(x => // map to pet with highest pos
Array.from(x.entries())
Expand All @@ -76,7 +76,6 @@ const advancingPet$: Observable<Pet> = interval(1000).pipe(
tap(updatePet),
);

const [usePetIDs] = bind(petIds$);
const [usePetByID] = bind((petId:number) => petByID(petId));
const [useLeader] = bind(leadingPet$, null);
const [useAdvancingPet] = bind(advancingPet$, null);
Expand Down
26 changes: 20 additions & 6 deletions docs/api/utils/partitionByKey.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ export function partitionByKey<T, K, R>(
stream: Observable<T>,
keySelector: (value: T) => K,
streamSelector: (grouped: Observable<T>, key: K) => Observable<R>,
): [(key: K) => GroupedObservable<K, R>, Observable<K[]>]
): [(key: K) => GroupedObservable<K, R>, Observable<KeyChanges<K>>]

interface KeyChanges<K> {
type: "add" | "remove"
keys: Iterable<K>
}
```

#### Arguments
Expand All @@ -26,21 +31,26 @@ export function partitionByKey<T, K, R>(

1. A function that accepts a key and returns a stream for the group of that key.

2. A stream with the list of active keys
2. A stream of KeyChanges, an object the describes what keys have been added or deleted.

### Examples

```ts
const source = interval(1000);
const [getGroupByKey, keys$] = partitionByKey(
const [getGroupByKey, keyChanges$] = partitionByKey(
source,
x => x % 2 == 0 ? "even" : "odd",
(groupedObservable$, key) => groupedObservable$.pipe(map(x => `${x} is ${key}`))
);

const [useEven, even$] = bind(getGroupByKey("even"));
const [useOdd, odd$] = bind(getGroupByKey("odd"));
const [useKeys] = bind(keys$);
const [useKeys] = bind(
keys$.pipe(
toKeySet(),
map(keySet => Array.from(keySet))
)
);

function MyComponent() {
const odd = useOdd();
Expand Down Expand Up @@ -76,13 +86,16 @@ const [petUpdate$, updatePet] = createSignal<Pet>();
// Let's line up our pets
const petRace$ = merge(of(...petNames), petUpdate$);

const [petByID, petIds$] = partitionByKey(
const [petByID, petIdChange$] = partitionByKey(
petRace$,
x => x.id,
)

const [usePetByID] = bind((id: number) => petByID(id));
const [usePetIDs] = bind(petIds$);
const [usePetIDs] = bind(petIdChange$.pipe(
toKeySet(),
map(keySet => Array.from(keySet))
));

const PetItem = ({petID}: {petID: number}) => {
const pet = usePetByID(petID);
Expand Down Expand Up @@ -111,3 +124,4 @@ const PetList = () => {
## See also

- [`combineKeys()`](combineKeys)
- [`toKeySet()`](toKeySet)
64 changes: 64 additions & 0 deletions docs/api/utils/toKeySet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
title: toKeySet()
sidebar_label: toKeySet()
---

Operator function that maps a stream of KeyChanges into a Set that contains the
active keys.

```ts
function toKeySet<K>(): OperatorFunction<KeyChanges<K>, Set<K>>
```

#### Returns

An [OperatorFunction] that will aggregate the key changes from the stream into
a Set with those keys.

### Example

```ts
// Call to service mocked for the example
const priceStream$ = from([
{
symbol: "AUDCHF",
price: 1.32,
},
{
symbol: "GOLD",
price: 1.213,
},
{
symbol: "CRYPTO",
price: 2.45,
},
{
symbol: "GOLD",
price: 1.27,
},
]).pipe(
// Add delay to every new price.
concatMap((value) => of(value).pipe(delay(100))),
)
const [lastPriceOfSymbol, symbolChange$] = partitionByKey(
priceStream$,
(x) => x.symbol,
)
const activeSymbols$ = toKeySet(symbolChange$).pipe(
map((keySet) => Array.from(keySet)),
)
activeSymbols$.subscribe((keys) => console.log(keys))
// Will log:
// ['AUDCHF']
// ['AUDCHF', 'GOLD']
// ['AUDCHF', 'GOLD', 'CRYPTO']
```

## See also

- [`partitionByKey()`](partitionByKey)

[operatorfunction]: https://rxjs.dev/api/index/interface/OperatorFunction
1 change: 1 addition & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = {
"api/utils/suspend",
"api/utils/suspended",
"api/utils/switchMapSuspended",
"api/utils/toKeySet",
{
type: "category",
label: "Deprecated",
Expand Down

0 comments on commit b89455a

Please sign in to comment.