Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize get_json_object by calling the main kernel only once #2129

Merged
merged 19 commits into from
Jun 14, 2024
Merged
204 changes: 102 additions & 102 deletions src/main/cpp/src/get_json_object.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/detail/offsets_iterator_factory.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/utilities.hpp>
Expand All @@ -37,6 +38,7 @@
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cuda/functional>
#include <thrust/pair.h>
#include <thrust/tuple.h>

Expand Down Expand Up @@ -830,28 +832,19 @@ rmm::device_uvector<path_instruction> construct_path_commands(
* @param path_commands_size The command buffer size.
* @param out_buf Buffer user to store the results of the query
* (nullptr in the size computation step)
* @param out_buf_size Size of the output buffer
* @returns A pair containing the result code and the output buffer.
*/
__device__ thrust::pair<bool, size_t> get_json_object_single(
char_range input,
cudf::device_span<path_instruction const> path_commands,
char* out_buf,
size_t out_buf_size)
__device__ thrust::pair<bool, cudf::size_type> get_json_object_single(
char_range input, cudf::device_span<path_instruction const> path_commands, char* out_buf)
{
json_parser j_parser(input);
j_parser.next_token();
// JSON validation check
if (json_token::ERROR == j_parser.get_current_token()) { return {false, 0}; }

// First pass: preprocess sizes.
// Second pass: writes output.
// The generator automatically determines which pass based on `out_buf`.
// If `out_buf_size` is zero, pass in `nullptr` to avoid generator writing trash output.
json_generator generator((out_buf_size == 0) ? nullptr : out_buf);
json_generator generator(out_buf);

bool const success = evaluate_path(
j_parser, generator, write_style::RAW, {path_commands.data(), path_commands.size()});
bool const success = evaluate_path(j_parser, generator, write_style::RAW, path_commands);

if (!success) {
// generator may contain trash output, e.g.: generator writes some output,
Expand All @@ -860,7 +853,7 @@ __device__ thrust::pair<bool, size_t> get_json_object_single(
generator.set_output_len_zero();
}

return {success, generator.get_output_len()};
return {success, static_cast<cudf::size_type>(generator.get_output_len())};
}

/**
Expand Down Expand Up @@ -890,60 +883,32 @@ template <int block_size>
// of registers over spilling.
__launch_bounds__(block_size, 1) CUDF_KERNEL
void get_json_object_kernel(cudf::column_device_view col,
cudf::detail::input_offsetalator d_offsets,
cudf::device_span<path_instruction const> path_commands,
cudf::size_type* d_sizes,
cudf::detail::input_offsetalator output_offsets,
thrust::pair<char const*, cudf::size_type>* out_stringviews,
char* out_buf,
cudf::bitmask_type* out_validity,
cudf::size_type* out_valid_count)
bool* has_out_of_bound)
{
auto tid = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();
for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < col.size(); tid += stride) {
char* const dst = out_buf + d_offsets[tid];
bool is_valid = false;
cudf::size_type out_size = 0;

cudf::size_type warp_valid_count{0};

auto active_threads = __ballot_sync(0xffff'ffffu, tid < col.size());
while (tid < col.size()) {
bool is_valid = false;
cudf::string_view const str = col.element<cudf::string_view>(tid);
auto const str = col.element<cudf::string_view>(tid);
if (str.size_bytes() > 0) {
char* dst = out_buf != nullptr ? out_buf + output_offsets[tid] : nullptr;
size_t const dst_size =
out_buf != nullptr ? output_offsets[tid + 1] - output_offsets[tid] : 0;

// process one single row
auto [result, output_size] =
get_json_object_single(str, {path_commands.data(), path_commands.size()}, dst, dst_size);
if (result) { is_valid = true; }

// filled in only during the precompute step. during the compute step, the
// offsets are fed back in so we do -not- want to write them out
if (out_buf == nullptr) { d_sizes[tid] = static_cast<cudf::size_type>(output_size); }
} else {
// valid JSON length is always greater than 0
// if `str` size len is zero, output len is 0 and `is_valid` is false
if (out_buf == nullptr) { d_sizes[tid] = 0; }
}
auto const in_size = d_offsets[tid + 1] - d_offsets[tid];

// validity filled in only during the output step
if (out_validity != nullptr) {
uint32_t mask = __ballot_sync(active_threads, is_valid);
// 0th lane of the warp writes the validity
if (!(tid % cudf::detail::warp_size)) {
out_validity[cudf::word_index(tid)] = mask;
warp_valid_count += __popc(mask);
}
// If `in_size == 0`, do not pass in the dst pointer to prevent writing garbage data.
thrust::tie(is_valid, out_size) =
get_json_object_single(str, path_commands, in_size != 0 ? dst : nullptr);
if (out_size > in_size) { *has_out_of_bound = true; }
}

tid += stride;
active_threads = __ballot_sync(active_threads, tid < col.size());
}

// sum the valid counts across the whole block
if (out_valid_count != nullptr) {
cudf::size_type block_valid_count =
cudf::detail::single_lane_block_sum_reduce<block_size, 0>(warp_valid_count);
if (threadIdx.x == 0) { atomicAdd(out_valid_count, block_valid_count); }
// Write out `nullptr` in the output string_view to indicate that the output is a null.
// The situation `out_stringviews == nullptr` should only happen if the kernel is launched a
// second time due to out-of-bound write in the first launch.
if (out_stringviews) { out_stringviews[tid] = {is_valid ? dst : nullptr, out_size}; }
}
}

Expand All @@ -953,64 +918,99 @@ std::unique_ptr<cudf::column> get_json_object(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (input.is_empty()) return cudf::make_empty_column(cudf::type_id::STRING);

if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); }
if (input.is_empty()) { return cudf::make_empty_column(cudf::type_id::STRING); }

// get a string buffer to store all the names and convert to device
std::string all_names;
for (auto const& inst : instructions) {
all_names += std::get<1>(inst);
}
cudf::string_scalar all_names_scalar(all_names, true, stream);
// parse the json_path into a command buffer
auto path_commands = construct_path_commands(
auto const all_names_scalar = cudf::string_scalar(all_names, true, stream);
auto const path_commands = construct_path_commands(
instructions, all_names_scalar, stream, rmm::mr::get_current_device_resource());
auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const in_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());

// A buffer to store the output strings without knowing their sizes.
// Since we do not know their sizes, we need to allocate the buffer a bit larger than the input
// size so that we will not write output strings into an out-of-bound position.
// Checking out-of-bound needs to be performed in the main kernel to make sure we will not have
// data corruption.
constexpr auto padding_ratio = 1.01;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: How do we get this ratio? I think we have some case that will make the output larger than input_size*1.01, like number normalization, adding quotes under some styles, and escaping some chars.

Copy link
Collaborator Author

@ttnghia ttnghia Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the total size, not one row's size. If we have 1M rows, this will be the size of approx. 1'010'000 rows. In other word, we have around 10'000 more rows to avoid invalid memory access issue. We don't care if data of row n is written cross its boundary to row n+1 which causes corruption, since we will discard the entire buffer if such overflow is detected. The extra 10'000 rows at the end is just to make sure data of the last row will not be written into out-of-bound of the entire buffer.

Copy link
Collaborator Author

@ttnghia ttnghia Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra 10'000 rows seem to be too much. We can turn up this padding ratio to be a dynamic value. For example: the padding should always be the average size of 100 rows. That should be large enough to avoid invalid memory access when writing the last row, but it is not guaranteed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One extreme case:

scala> val d = Seq("['\n']", "['\n\n\n\n\n\n\n\n\n\n']")
scala> val df = d.toDF
scala> df.createOrReplaceTempView("tab")
scala> spark.sql("select length(value) from tab").show()
+-------------+
|length(value)|
+-------------+
|             5|
|           17|
+-------------+

scala> spark.sql("select length(get_json_object(value, '$')) from tab").show()
+---------------------------------+
|length(get_json_object(value, $))|
+---------------------------------+
|                                 6|
|                               30|
+---------------------------------+

This case will cause invalid memory access:
round up 2 * 1.01 = 3, avg size is 11, total allocated size is 33, but total write size will be 35 (5 + 30).
It causes overlap writing as expected(is not an issue), but the tailing writing causes invalid memory access.

One option:
Add a method in json_generator like:

write_byte(char c) {

  if (curr_size >= max_allowed_size) {
     // write nothing
     curr_size++;
  } else {
     *curr_pos = c;
     curr_size++;
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my latest code, I pad the buffer by 100 * average_row_size. In this case, we have avg_row_size = 22/2=11, so buffer size will be 22 + 100*11, no overflow.

Copy link
Collaborator Author

@ttnghia ttnghia Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add that code and check the benchmark to see how it will affect the overall performance.
Edit: I indeed attempted to implement such bound check write, but gave up as it requires to modify all the json generator, json parser and ftos converter. In the meantime, I have an idea to further optimize writing with shared memory (#2130) so let this be a future work.

Copy link
Collaborator Author

@ttnghia ttnghia Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the padding, which is now computed using max row size. There is some overhead with this but it is small. This way can avoid invalid memory access in normal situations and also extreme cases such as when the column has all null rows except one very long row.

auto output_scratch = rmm::device_uvector<char>(
static_cast<std::size_t>(input.chars_size(stream) * padding_ratio), stream);
auto out_stringviews = rmm::device_uvector<thrust::pair<char const*, cudf::size_type>>{
static_cast<std::size_t>(input.size()), stream};
auto has_out_of_bound = rmm::device_scalar<bool>{false, stream};

constexpr int blocks_per_SM = 1;
constexpr int block_size = 256;
auto const num_blocks = [&] {
int device_id{};
cudaDeviceProp props{};
CUDF_CUDA_TRY(cudaGetDevice(&device_id));
CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id));
return props.multiProcessorCount * blocks_per_SM;
}();

// compute output sizes
auto sizes = rmm::device_uvector<cudf::size_type>(
input.size(), stream, rmm::mr::get_current_device_resource());
auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());

constexpr int block_size = 512;
cudf::detail::grid_1d const grid{input.size(), block_size};
auto d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
// preprocess sizes (returned in the offsets buffer)
get_json_object_kernel<block_size>
<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
*d_input_ptr, path_commands, sizes.data(), d_offsets, nullptr, nullptr, nullptr);

// convert sizes to offsets
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr,
in_offsets,
path_commands,
out_stringviews.data(),
output_scratch.data(),
has_out_of_bound.data());

// If we didn't see any out-of-bound write, everything is good so far.
// Just gather the output strings and return.
if (!has_out_of_bound.value(stream)) {
return cudf::make_strings_column(out_stringviews, stream, mr);
}
// From here, we had out-of-bound write. Although this is very rare, it may still happen.

// This scratch buffer is no longer needed.
output_scratch = rmm::device_uvector<char>{0, stream};

// The string sizes computed in the previous kernel call will be used to allocate a new char
// buffer to store the output.
auto const size_it = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[string_pairs = out_stringviews.data()] __device__(auto const idx) {
return string_pairs[idx].second;
}));
auto [offsets, output_size] =
cudf::strings::detail::make_offsets_child_column(sizes.begin(), sizes.end(), stream, mr);
d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());
cudf::strings::detail::make_offsets_child_column(size_it, size_it + input.size(), stream, mr);

// allocate output string column
rmm::device_uvector<char> chars(output_size, stream, mr);
// Also compute the null mask using the stored char pointers.
auto const validator = [] __device__(thrust::pair<char const*, cudf::size_type> const item) {
return item.first != nullptr;
};
auto [null_mask, null_count] =
cudf::detail::valid_if(out_stringviews.begin(), out_stringviews.end(), validator, stream, mr);

// potential optimization : if we know that all outputs are valid, we could
// skip creating the validity mask altogether
rmm::device_buffer validity =
cudf::detail::create_null_mask(input.size(), cudf::mask_state::UNINITIALIZED, stream, mr);
// No longer need it from here. Free up memory for now.
out_stringviews = rmm::device_uvector<thrust::pair<char const*, cudf::size_type>>{0, stream};

// compute results
rmm::device_scalar<cudf::size_type> d_valid_count{0, stream};
auto chars = rmm::device_uvector<char>(output_size, stream, mr);
auto const out_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());

has_out_of_bound.set_value_to_zero_async(stream);
get_json_object_kernel<block_size>
<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
*d_input_ptr,
path_commands,
sizes.data(),
d_offsets,
chars.data(),
static_cast<cudf::bitmask_type*>(validity.data()),
d_valid_count.data());

return make_strings_column(input.size(),
std::move(offsets),
chars.release(),
input.size() - d_valid_count.value(stream),
std::move(validity));
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr,
out_offsets,
path_commands,
nullptr /*out_stringviews*/,
chars.data(),
has_out_of_bound.data());

// This kernel call should not see out-of-bound write. If it is still detected, there must be
// something wrong happened.
CUDF_EXPECTS(!has_out_of_bound.value(stream),
"Unexpected out-of-bound write in get_json_object kernel.");

return cudf::make_strings_column(
input.size(), std::move(offsets), chars.release(), null_count, std::move(null_mask));
}

} // namespace detail
Expand Down
Loading