Skip to content

Conversation

kambstreat
Copy link

Summary

Why / Goal

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

s"${aggregationPart.inputColumn}_$opSuffix${aggregationPart.window.suffix}${bucketSuffix}"

def incOutputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${bucketSuffix}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we still keep the aggregationPart.window.suffix? Otherwise, how do we reconstruct the final output column?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pengyu-hou not sure I get it. I can not use the window.suffix right as the intermediate incremental is daily aggregation.

.toArray
.zip(columnAggregators.map(_.irType))

val incSchema = aggregationParts
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we should use a full incremental in the code and we can keep inc as the suffix for the table so the table names are not getting too long. What do you think?

Suggested change
val incSchema = aggregationParts
val incrementalSchema = aggregationParts

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. good to use full name.

def cleanName: String = metaData.name.sanitize

def outputTable = s"${metaData.outputNamespace}.${metaData.cleanName}"
def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_inc"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_inc"
def incrementalOutputTable = s"${metaData.outputNamespace}.${metaData.cleanName}_daily_inc"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abbywh I think we already have a daily here.

}
}

class AverageIR extends SimpleAggregator[Array[Any], Array[Any], Double] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like this mostly exists to serialize aggregators. Is there something more generic we can do? @nikhilsimha and I talked about https://fory.apache.org/ for example, but it could be even simpler to moving this logic into a different class abstraction

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that if we could somehow make it more general, then we would not have to implement for each operation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @nikhil-zlai 😁

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of sticking with Avro because it is what we used for the online serving path so that we can keep the ser//de logic same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue with Avro is that it needs to schematize the object, whereas hopIR is just an Array[Any]. If you just reflect the hopIR, there is no need to schematize it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants