Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize get_json_object by calling the main kernel only once #2129

Merged
merged 19 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 130 additions & 120 deletions src/main/cpp/src/get_json_object.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/detail/offsets_iterator_factory.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/utilities.hpp>
Expand All @@ -37,7 +38,11 @@
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cuda/functional>
#include <thrust/functional.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/pair.h>
#include <thrust/transform_reduce.h>
#include <thrust/tuple.h>

namespace spark_rapids_jni {
Expand Down Expand Up @@ -825,33 +830,21 @@ rmm::device_uvector<path_instruction> construct_path_commands(
*
*
* @param input The incoming json string
* @param input_len Size of the incoming json string
* @param path_commands_ptr The command buffer to be applied to the string.
* @param path_commands_size The command buffer size.
* @param out_buf Buffer user to store the results of the query
* (nullptr in the size computation step)
* @param out_buf_size Size of the output buffer
* @returns A pair containing the result code and the output buffer.
* @param path_commands The command buffer to be applied to the string
* @param out_buf Buffer user to store the string resulted from the query
* @returns A pair containing the result code and the output buffer
*/
__device__ thrust::pair<bool, size_t> get_json_object_single(
char_range input,
cudf::device_span<path_instruction const> path_commands,
char* out_buf,
size_t out_buf_size)
__device__ thrust::pair<bool, cudf::size_type> get_json_object_single(
char_range input, cudf::device_span<path_instruction const> path_commands, char* out_buf)
{
json_parser j_parser(input);
j_parser.next_token();
// JSON validation check
if (json_token::ERROR == j_parser.get_current_token()) { return {false, 0}; }

// First pass: preprocess sizes.
// Second pass: writes output.
// The generator automatically determines which pass based on `out_buf`.
// If `out_buf_size` is zero, pass in `nullptr` to avoid generator writing trash output.
json_generator generator((out_buf_size == 0) ? nullptr : out_buf);
json_generator generator(out_buf);

bool const success = evaluate_path(
j_parser, generator, write_style::RAW, {path_commands.data(), path_commands.size()});
bool const success = evaluate_path(j_parser, generator, write_style::RAW, path_commands);

if (!success) {
// generator may contain trash output, e.g.: generator writes some output,
Expand All @@ -860,25 +853,23 @@ __device__ thrust::pair<bool, size_t> get_json_object_single(
generator.set_output_len_zero();
}

return {success, generator.get_output_len()};
return {success, static_cast<cudf::size_type>(generator.get_output_len())};
}

/**
* @brief Kernel for running the JSONPath query.
*
* This kernel operates in a 2-pass way. On the first pass it computes the
* output sizes. On the second pass, it fills in the provided output buffers
* (chars and validity).
* This kernel writes out the output strings and their lengths at the same time. If any output
* length exceed buffer size limit, a boolean flag will be turned on to inform to the caller.
* In such situation, another (larger) output buffer will be generated and the kernel is launched
* again. Otherwise, launching this kernel only once is sufficient to produce the desired output.
*
* @param col Device view of the incoming string
* @param input The input JSON strings stored in a strings column
* @param offsets Offsets to the output locations in the output buffer
* @param path_commands JSONPath command buffer
* @param d_sizes a buffer used to write the output sizes in the first pass,
* and is read back in on the second pass to compute offsets.
* @param output_offsets Buffer used to store the string offsets for the results
* of the query
* @param out_buf Buffer used to store the results of the query
* @param out_validity Output validity buffer
* @param out_valid_count Output count of # of valid bits
* @param out_stringviews The output array to store pointers to the output strings and their sizes
* @param out_buf Buffer used to store the strings resulted from the query
* @param has_out_of_bound Flag to indicate if any output string has length exceeds its buffer size
*/
template <int block_size>
// We have 1 for the minBlocksPerMultiprocessor in the launch bounds to avoid spilling from
Expand All @@ -889,61 +880,33 @@ template <int block_size>
// the performance is really bad. This essentially tells NVCC to prefer using lots
// of registers over spilling.
__launch_bounds__(block_size, 1) CUDF_KERNEL
void get_json_object_kernel(cudf::column_device_view col,
void get_json_object_kernel(cudf::column_device_view input,
cudf::detail::input_offsetalator offsets,
cudf::device_span<path_instruction const> path_commands,
cudf::size_type* d_sizes,
cudf::detail::input_offsetalator output_offsets,
thrust::pair<char const*, cudf::size_type>* out_stringviews,
char* out_buf,
cudf::bitmask_type* out_validity,
cudf::size_type* out_valid_count)
bool* has_out_of_bound)
{
auto tid = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();
for (auto tid = cudf::detail::grid_1d::global_thread_id(); tid < input.size(); tid += stride) {
char* const dst = out_buf + offsets[tid];
bool is_valid = false;
cudf::size_type out_size = 0;

cudf::size_type warp_valid_count{0};

auto active_threads = __ballot_sync(0xffff'ffffu, tid < col.size());
while (tid < col.size()) {
bool is_valid = false;
cudf::string_view const str = col.element<cudf::string_view>(tid);
auto const str = input.element<cudf::string_view>(tid);
if (str.size_bytes() > 0) {
char* dst = out_buf != nullptr ? out_buf + output_offsets[tid] : nullptr;
size_t const dst_size =
out_buf != nullptr ? output_offsets[tid + 1] - output_offsets[tid] : 0;

// process one single row
auto [result, output_size] =
get_json_object_single(str, {path_commands.data(), path_commands.size()}, dst, dst_size);
if (result) { is_valid = true; }

// filled in only during the precompute step. during the compute step, the
// offsets are fed back in so we do -not- want to write them out
if (out_buf == nullptr) { d_sizes[tid] = static_cast<cudf::size_type>(output_size); }
} else {
// valid JSON length is always greater than 0
// if `str` size len is zero, output len is 0 and `is_valid` is false
if (out_buf == nullptr) { d_sizes[tid] = 0; }
}
auto const max_size = offsets[tid + 1] - offsets[tid];

// validity filled in only during the output step
if (out_validity != nullptr) {
uint32_t mask = __ballot_sync(active_threads, is_valid);
// 0th lane of the warp writes the validity
if (!(tid % cudf::detail::warp_size)) {
out_validity[cudf::word_index(tid)] = mask;
warp_valid_count += __popc(mask);
}
// If `max_size == 0`, do not pass in the dst pointer to prevent writing garbage data.
thrust::tie(is_valid, out_size) =
get_json_object_single(str, path_commands, max_size != 0 ? dst : nullptr);
if (out_size > max_size) { *has_out_of_bound = true; }
}

tid += stride;
active_threads = __ballot_sync(active_threads, tid < col.size());
}

// sum the valid counts across the whole block
if (out_valid_count != nullptr) {
cudf::size_type block_valid_count =
cudf::detail::single_lane_block_sum_reduce<block_size, 0>(warp_valid_count);
if (threadIdx.x == 0) { atomicAdd(out_valid_count, block_valid_count); }
// Write out `nullptr` in the output string_view to indicate that the output is a null.
// The situation `out_stringviews == nullptr` should only happen if the kernel is launched a
// second time due to out-of-bound write in the first launch.
if (out_stringviews) { out_stringviews[tid] = {is_valid ? dst : nullptr, out_size}; }
}
}

Expand All @@ -953,64 +916,111 @@ std::unique_ptr<cudf::column> get_json_object(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (input.is_empty()) return cudf::make_empty_column(cudf::type_id::STRING);

if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); }
if (input.is_empty()) { return cudf::make_empty_column(cudf::type_id::STRING); }

// get a string buffer to store all the names and convert to device
std::string all_names;
for (auto const& inst : instructions) {
all_names += std::get<1>(inst);
}
cudf::string_scalar all_names_scalar(all_names, true, stream);
// parse the json_path into a command buffer
auto path_commands = construct_path_commands(
auto const all_names_scalar = cudf::string_scalar(all_names, true, stream);
auto const path_commands = construct_path_commands(
instructions, all_names_scalar, stream, rmm::mr::get_current_device_resource());
auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const in_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());

// A buffer to store the output strings without knowing their sizes.
// Since we do not know their sizes, we need to allocate the buffer a bit larger than the input
// size so that we will not write output strings into an out-of-bound position.
// Checking out-of-bound needs to be performed in the main kernel to make sure we will not have
// data corruption.
auto const scratch_size = [&] {
auto const max_row_size = thrust::transform_reduce(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.size()),
cuda::proclaim_return_type<int64_t>(
[in_offsets] __device__(auto const idx) { return in_offsets[idx + 1] - in_offsets[idx]; }),
int64_t{0},
thrust::maximum{});

// Pad the scratch buffer by an additional size that is a multiple of max row size.
auto constexpr padding_rows = 10;
return input.chars_size(stream) + max_row_size * padding_rows;
}();
auto output_scratch = rmm::device_uvector<char>(scratch_size, stream);
auto out_stringviews = rmm::device_uvector<thrust::pair<char const*, cudf::size_type>>{
static_cast<std::size_t>(input.size()), stream};
auto has_out_of_bound = rmm::device_scalar<bool>{false, stream};

constexpr int blocks_per_SM = 1;
constexpr int block_size = 256;
auto const num_blocks = [&] {
int device_id{};
cudaDeviceProp props{};
CUDF_CUDA_TRY(cudaGetDevice(&device_id));
CUDF_CUDA_TRY(cudaGetDeviceProperties(&props, device_id));
return props.multiProcessorCount * blocks_per_SM;
}();

// compute output sizes
auto sizes = rmm::device_uvector<cudf::size_type>(
input.size(), stream, rmm::mr::get_current_device_resource());
auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets());

constexpr int block_size = 512;
cudf::detail::grid_1d const grid{input.size(), block_size};
auto d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
// preprocess sizes (returned in the offsets buffer)
get_json_object_kernel<block_size>
<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
*d_input_ptr, path_commands, sizes.data(), d_offsets, nullptr, nullptr, nullptr);

// convert sizes to offsets
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr,
in_offsets,
path_commands,
out_stringviews.data(),
output_scratch.data(),
has_out_of_bound.data());

// If we didn't see any out-of-bound write, everything is good so far.
// Just gather the output strings and return.
if (!has_out_of_bound.value(stream)) {
return cudf::make_strings_column(out_stringviews, stream, mr);
}
// From here, we had out-of-bound write. Although this is very rare, it may still happen.

// This scratch buffer is no longer needed.
output_scratch = rmm::device_uvector<char>{0, stream};

// The string sizes computed in the previous kernel call will be used to allocate a new char
// buffer to store the output.
auto const size_it = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[string_pairs = out_stringviews.data()] __device__(auto const idx) {
return string_pairs[idx].second;
}));
auto [offsets, output_size] =
cudf::strings::detail::make_offsets_child_column(sizes.begin(), sizes.end(), stream, mr);
d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());
cudf::strings::detail::make_offsets_child_column(size_it, size_it + input.size(), stream, mr);

// allocate output string column
rmm::device_uvector<char> chars(output_size, stream, mr);
// Also compute the null mask using the stored char pointers.
auto const validator = [] __device__(thrust::pair<char const*, cudf::size_type> const item) {
return item.first != nullptr;
};
auto [null_mask, null_count] =
cudf::detail::valid_if(out_stringviews.begin(), out_stringviews.end(), validator, stream, mr);

// potential optimization : if we know that all outputs are valid, we could
// skip creating the validity mask altogether
rmm::device_buffer validity =
cudf::detail::create_null_mask(input.size(), cudf::mask_state::UNINITIALIZED, stream, mr);
// No longer need it from here. Free up memory for now.
out_stringviews = rmm::device_uvector<thrust::pair<char const*, cudf::size_type>>{0, stream};

// compute results
rmm::device_scalar<cudf::size_type> d_valid_count{0, stream};
auto chars = rmm::device_uvector<char>(output_size, stream, mr);
auto const out_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view());

has_out_of_bound.set_value_to_zero_async(stream);
get_json_object_kernel<block_size>
<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
*d_input_ptr,
path_commands,
sizes.data(),
d_offsets,
chars.data(),
static_cast<cudf::bitmask_type*>(validity.data()),
d_valid_count.data());

return make_strings_column(input.size(),
std::move(offsets),
chars.release(),
input.size() - d_valid_count.value(stream),
std::move(validity));
<<<num_blocks, block_size, 0, stream.value()>>>(*d_input_ptr,
out_offsets,
path_commands,
nullptr /*out_stringviews*/,
chars.data(),
has_out_of_bound.data());

// This kernel call should not see out-of-bound write. If it is still detected, there must be
// something wrong happened.
CUDF_EXPECTS(!has_out_of_bound.value(stream),
"Unexpected out-of-bound write in get_json_object kernel.");

return cudf::make_strings_column(
input.size(), std::move(offsets), chars.release(), null_count, std::move(null_mask));
}

} // namespace detail
Expand Down
25 changes: 21 additions & 4 deletions src/test/java/com/nvidia/spark/rapids/jni/GetJsonObjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ void getJsonObjectTest() {
namedPath("k") };
try (ColumnVector jsonCv = ColumnVector.fromStrings(
"{\"k\": \"v\"}");
ColumnVector expected = ColumnVector.fromStrings(
"v");
ColumnVector actual = JSONUtils.getJsonObject(jsonCv, query)) {
ColumnVector expected = ColumnVector.fromStrings(
"v");
ColumnVector actual = JSONUtils.getJsonObject(jsonCv, query)) {
assertColumnsAreEqual(expected, actual);
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ void getJsonObjectTest_Escape() {
String JSON4 = "['a','b','\"C\"']";
// \\u4e2d\\u56FD is 中国
String JSON5 = "'\\u4e2d\\u56FD\\\"\\'\\\\\\/\\b\\f\\n\\r\\t\\b'";
String JSON6 = "['\\u4e2d\\u56FD\\\"\\'\\\\\\/\\b\\f\\n\\r\\t\\b']";
String JSON6 = "['\\u4e2d\\u56FD\\\"\\'\\\\\\/\\b\\f\\n\\r\\t\\b']";

String expectedStr1 = "{\"a\":\"A\"}";
String expectedStr2 = "{\"a\":\"A\\\"\"}";
Expand Down Expand Up @@ -600,6 +600,23 @@ void getJsonObjectTest_15() {
}
}

/**
* This test is when the JNI kernel is called twice. It happens when the output JSON strings
* have lengths that are larger than their corresponding input.
*/
@Test
void getJsonObjectTest_JNIKernelCalledTwice() {
// This is equivalent to the path '$'.
JSONUtils.PathInstructionJni[] query = new JSONUtils.PathInstructionJni[] {};
try (
ColumnVector input = ColumnVector.fromStrings("['\n']", "['\n\n\n\n\n\n\n\n\n\n']",
"", "", "", "", "", "", "", "");
ColumnVector expected = ColumnVector.fromStrings("[\"\\n\"]",
"[\"\\n\\n\\n\\n\\n\\n\\n\\n\\n\\n\"]", null, null, null, null, null, null, null, null);
ColumnVector actual = JSONUtils.getJsonObject(input, query)) {
assertColumnsAreEqual(expected, actual);
}
}

private JSONUtils.PathInstructionJni wildcardPath() {
return new JSONUtils.PathInstructionJni(JSONUtils.PathInstructionType.WILDCARD, "", -1);
Expand Down
Loading