Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8492][CH] Offload RangeExec #8518

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

taiyang-li
Copy link
Contributor

@taiyang-li taiyang-li commented Jan 13, 2025

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

(Fixes: #8492)

How was this patch tested?

All existing uts using range function. Below ut is excluded because assertion that plan contains RangeExec operator would fail after RangeExec is offloaded to gluten.

  • "SPARK-27439: Explain result should match collected result after view change"

@github-actions github-actions bot added CORE works for Gluten Core VELOX CLICKHOUSE labels Jan 13, 2025
Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
Copy link

Run Gluten Clickhouse CI on x86

@taiyang-li taiyang-li changed the title [WIP] Support range functions [CH] Support range functions Jan 13, 2025
Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
Copy link

Run Gluten Clickhouse CI on x86

@taiyang-li taiyang-li changed the title [CH] Support range functions [GLUTEN-8492][CH] Support range functions Jan 14, 2025
@taiyang-li taiyang-li marked this pull request as ready for review January 14, 2025 04:14
Copy link

#8492

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

2 similar comments
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@taiyang-li taiyang-li changed the title [GLUTEN-8492][CH] Support range functions [GLUTEN-8492][CH] Offload RangeExec Jan 14, 2025
Copy link

Run Gluten Clickhouse CI on x86

1 similar comment
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

2 similar comments
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@taiyang-li taiyang-li requested review from zhztheplayer and PHILO-HE and removed request for PHILO-HE February 11, 2025 04:18
@taiyang-li
Copy link
Contributor Author

For performance comparison refer to #8492 (comment)

Copy link

Run Gluten Clickhouse CI on x86

@taiyang-li taiyang-li added VELOX and removed VELOX labels Feb 11, 2025
@taiyang-li
Copy link
Contributor Author

@CodiumAI-Agent /review

@CodiumAI-Agent
Copy link

CodiumAI-Agent commented Feb 11, 2025

PR Reviewer Guide 🔍

(Review updated until commit febd663)

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

8492 - Partially compliant

Compliant requirements:

  • Investigate and address the performance issue with the sum function using a filter in the ClickHouse backend compared to vanilla Spark.
  • Implement necessary changes to offload RangeExec to the Gluten backend.
  • Ensure compatibility and correctness of the solution with existing functionalities.

Non-compliant requirements:

  • Update relevant tests to reflect the changes.

Requires further human verification:

  • Validate the performance improvement of the sum function with a filter in real-world scenarios.
  • Ensure the correctness of the offloaded RangeExec through extensive testing.
⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

The doTransform method in CHRangeExecTransformer introduces a new transformation logic. Ensure that the generated Substrait plan is correct and compatible with the ClickHouse backend.

override def doTransform(context: SubstraitContext): TransformContext = {
  val output = outputAttributes
  val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
  val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
  val columnTypeNodes = JavaConverters
    .seqAsJavaListConverter(
      output.map(attr => new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL)))
    .asJava

  val optimizationContent = s"isRange=1\n"
  val optimization =
    BackendsApiManager.getTransformerApiInstance.packPBMessage(
      StringValue.newBuilder.setValue(optimizationContent).build)
  val extensionNode = ExtensionBuilder.makeAdvancedExtension(optimization, null)
  val readNode = RelBuilder.makeReadRel(
    typeNodes,
    nameList,
    columnTypeNodes,
    null,
    extensionNode,
    context,
    context.nextOperatorId(this.nodeName))

  TransformContext(output, readNode)
Performance Concern

The SourceFromRange implementation may have performance implications due to its handling of large ranges and potential overflows. Verify its efficiency and correctness.

SourceFromRange::SourceFromRange(const DB::Block & header_, Int64 start_, Int64 end_, Int64 step_, Int32 num_slices_, Int32 slice_index_, size_t max_block_size_)
    : DB::ISource(header_)
    , start(start_)
    , end(end_)
    , step(step_)
    , num_slices(num_slices_)
    , slice_index(slice_index_)
    , max_block_size(max_block_size_)
    , num_elements(getNumElements())
    , is_empty_range(start == end )
{
    const auto & header = getOutputs().front().getHeader();
    if (header.columns() != 1)
        throw Exception(ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH, "Expected 1 column, got {}", header.columns());
    if (!header.getByPosition(0).type->equals(DataTypeInt64()))
        throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected Int64 column, got {}", header.getByPosition(0).type->getName());
    if (step == 0)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Step cannot be zero");


    Int128 partition_start = (slice_index * num_elements) / num_slices * step + start;
    Int128 partition_end = (((slice_index + 1) * num_elements) / num_slices) * step + start;

    auto get_safe_margin = [](Int128 bi) -> Int64
    {
        if (bi <= std::numeric_limits<Int64>::max() && bi >= std::numeric_limits<Int64>::min())
            return static_cast<Int64>(bi);
        else if (bi > 0)
            return std::numeric_limits<Int64>::max();
        else
            return std::numeric_limits<Int64>::min();
    };

    safe_partition_start = get_safe_margin(partition_start);
    safe_partition_end = get_safe_margin(partition_end);
    current = safe_partition_start;
    previous = 0;
    overflow = false;
}

Int128 SourceFromRange::getNumElements() const
{
    const auto safe_start = static_cast<Int128>(start);
    const auto safe_end = static_cast<Int128>(end);
    if ((safe_end - safe_start) % step == 0 || (safe_end > safe_start) != (step > 0))
    {
        return (safe_end - safe_start) / step;
    }
    else
    {
        // the remainder has the same sign with range, could add 1 more
        return (safe_end - safe_start) / step + 1;
    }
}


DB::Chunk SourceFromRange::generate()
{
    if (is_empty_range)
        return {};

    if (overflow || (step > 0 && current >= safe_partition_end) || (step < 0 && current <= safe_partition_end))
        return {};


    auto column = DB::ColumnInt64::create();
    auto & data = column->getData();
    data.resize_exact(max_block_size);

    size_t row_i = 0;
    if (step > 0)
    {
        for (; current < safe_partition_end && !overflow && row_i < max_block_size; ++row_i)
        {
            previous = current;
            data[row_i] = current;
            current += step;
            overflow = current < previous;
        }
    }
    else
    {
        for (; current > safe_partition_end && !overflow && row_i < max_block_size; ++row_i)
        {
            previous = current;
            data[row_i] = current;
            current += step;
            overflow = current > previous;
        }
    }
    data.resize_exact(row_i);

    // std::cout << "gen rows:" << column->size() << std::endl;
    DB::Columns columns;
    columns.push_back(std::move(column));
    return DB::Chunk(std::move(columns), row_i);
}
Serialization Logic

The new makeExtensionTable method for range handling introduces JSON serialization. Ensure that the serialization and deserialization processes are robust and error-free.

public static ExtensionTableNode makeExtensionTable(
    Long start, Long end, Long step, Integer numSlices, Integer sliceIndex)
    throws JsonProcessingException {
  ObjectMapper mapper = new ObjectMapper();
  ObjectNode json = mapper.createObjectNode();
  json.put("start", start);
  json.put("end", end);
  json.put("step", step);
  json.put("numSlices", numSlices);
  json.put("sliceIndex", sliceIndex);

  String result;
  result = mapper.writeValueAsString(json);
  return new ExtensionTableNode(
      Collections.emptyList(), result, (Seq<String>) Seq$.MODULE$.<String>empty());
}

Copy link

Run Gluten Clickhouse CI on x86

@CodiumAI-Agent
Copy link

Persistent review updated to latest commit febd663

@taiyang-li
Copy link
Contributor Author

Run Gluten Clickhouse CI on x86

Copy link
Contributor

@PHILO-HE PHILO-HE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly reviewed the common code. Basically looks good!

@PHILO-HE
Copy link
Contributor

Maybe, we can move the existing GlutenSQLRangeExecSuite.scala from spark33 ut folder to gluten-ut/test folder, which can get it tested for all Spark versions and both backends.

Copy link

Run Gluten Clickhouse CI on x86

@taiyang-li
Copy link
Contributor Author

Maybe, we can move the existing GlutenSQLRangeExecSuite.scala from spark33 ut folder to gluten-ut/test folder, which can get it tested for all Spark versions and both backends.

done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLICKHOUSE CORE works for Gluten Core VELOX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CH] sum with filter bad performance compared to vanilla spark
3 participants