Skip to content

Effective price based on time #1045

Open
@ybyzek

Description

@ybyzek

Porting confluentinc/ksqldb-recipes#140 here

Describe the use case

Database has a list of dates and prices...want to store the effective price in a KTable, where the effective price is defined as the price's date being greater than the current date, and closest to the current date. Can create dashboards, for example, with hourly revenue. (Recipe author needs to develop this use case a bit more...)

Provide the ksqlDB application

Src for Confluent-internal access, bonus it uses lambdas

CREATE STREAM CUSTOMER_BILLS 
WITH (TIMESTAMP='timestamp', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss''Z''') AS 
SELECT
  METRICS.LC_ID AS LC_ID,
  PRICES.KEY AS PRICE_KEY,
  (METRICS.VALUE * REDUCE(
      PRICES.PRICES, 
      STRUCT(TIME:=PARSE_TIMESTAMP('0001', 'yyyy'), PRICE:=CAST(0.0 AS DOUBLE)), 
      (S, K, V) => (
        CASE WHEN (PARSE_TIMESTAMP(K, 'yyyy-MM-dd''T''HH:mm:ss') < PARSE_TIMESTAMP(METRICS.TIMESTAMP, 'yyyy-MM-dd''T''HH:mm:ss''Z''')) 
          THEN STRUCT(TIME:=PARSE_TIMESTAMP(K, 'yyyy-MM-dd''T''HH:mm:ss'), PRICE:=V) 
          ELSE S 
        END))->PRICE) AS REVENUE,
  METRICS.TIMESTAMP AS TIMESTAMP,
  METRICS.ROWTIME AS METRICS_ROWTIME
FROM METRICS_WITH_CONFIG_KEY METRICS
INNER JOIN PRICES ON (((METRICS.METRICS_TYPE + ':') + LCASE(METRICS.CONFIG_KEY)) = PRICES.KEY)
EMIT CHANGES;

From @agavra

Basically PRICES.PRICES is the "price" table that contains date -> effective_price. What the REDUCE operation does is iterate throughout the PRICES.PRICES and if it finds a key in that map that is less than the target date it sets the price to the price in the map (this leverages the fact that we knew the underlying data would be sorted on time, but we could have just as easily added the condition to be < target date AND > previous_selected_date). When that reduce operation is done, we have the value associated with the latest date and multiply it by the quantity "purchased"

Especially useful if you can't use COLLECT_LIST (one column only)

Note from @blueedgenick

seems like a version that combines both collect_list and a reduce might be most optimal ? this is actually going to look very similar to the solution we had a couple of months ago to the "matching users with mutual interest on a dating site".
I'd go with a group-by and the collect_list to get all the price/date pairs for a product into one collection, then join that to the incoming stream and reduce to find the effective price at the timestamp of the stream-side record

"matching users with mutual interest on a dating site" --> https://confluentinc.github.io/ksqldb-recipes/customer-360/online-dating/

Metadata

Metadata

Assignees

Labels

recipeuse caseA tutorial with an extended business use case

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions