Skip to content

refine settings javascript_max_memory_bytes and correct set hard limit #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
7 changes: 6 additions & 1 deletion base/base/getMemoryAmount.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ uint64_t getMemoryAmountOrZero()

}


uint64_t getMemoryAmount()
{
auto res = getMemoryAmountOrZero();
if (!res)
throw std::runtime_error("Cannot determine memory amount");
return res;
}

uint64_t getMemoryAmountOrZeroCached()
{
static auto res = getMemoryAmountOrZero();
return res;
}
2 changes: 2 additions & 0 deletions base/base/getMemoryAmount.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ uint64_t getMemoryAmountOrZero();
/** Throws exception if it cannot determine the size of physical memory.
*/
uint64_t getMemoryAmount();

uint64_t getMemoryAmountOrZeroCached();
2 changes: 1 addition & 1 deletion programs/server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ settings:
part_commit_pool_size: 8 # Total shared thread pool size for building and committing parts for Stream
max_idempotent_ids: 1000 # Maximum idempotent IDs to keep in memory and on disk for idempotent data ingestion
_tp_internal_system_open_sesame: true # Control the access to system.* streams
javascript_max_memory_bytes: 104857600 #Maximum heap size of javascript UDA/UDF in bytes, default is 100*1024*1024 bytes
javascript_max_memory_bytes: 204857600 #Maximum heap size of javascript UDA/UDF in bytes, default is 200 MByte
recovery_policy: "strict" # Recovery policy for materialized view. strict or best_effort
recovery_retry_for_sn_failure: 3 # retry times for sn failure. this value only apply if the `recovery_policy` is `best_effort`
max_block_size: 65409 # 65536 - (PADDING_FOR_SIMD - 1)
Expand Down
2 changes: 1 addition & 1 deletion programs/server/embedded.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
<part_commit_pool_size>8</part_commit_pool_size>
<max_idempotent_ids>1000</max_idempotent_ids>
<_tp_internal_system_open_sesame>true</_tp_internal_system_open_sesame>
<javascript_max_memory_bytes>104857600</javascript_max_memory_bytes>
<javascript_max_memory_bytes>204857600</javascript_max_memory_bytes>
</global>
<stream>
<default_shards>1</default_shards>
Expand Down
15 changes: 10 additions & 5 deletions src/AggregateFunctions/AggregateFunctionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context,
bool is_changelog_input) const
/// proton: ends
{
Expand All @@ -97,8 +98,8 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
bool has_null_arguments = std::any_of(types_without_low_cardinality.begin(), types_without_low_cardinality.end(),
[](const auto & type) { return type->onlyNull(); });

AggregateFunctionPtr nested_function = getImpl(
name, nested_types, nested_parameters, out_properties, has_null_arguments, is_changelog_input);
AggregateFunctionPtr nested_function
= getImpl(name, nested_types, nested_parameters, out_properties, has_null_arguments, context, is_changelog_input);

// Pure window functions are not real aggregate functions. Applying
// combinators doesn't make sense for them, they must handle the
Expand All @@ -109,7 +110,8 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
return combinator->transformAggregateFunction(nested_function, out_properties, types_without_low_cardinality, parameters);
}

auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false, is_changelog_input);
auto with_original_arguments
= getImpl(name, types_without_low_cardinality, parameters, out_properties, false, context, is_changelog_input);

if (!with_original_arguments)
throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -123,6 +125,7 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
const Array & parameters,
AggregateFunctionProperties & out_properties,
bool has_null_arguments,
ContextPtr context,
bool is_changelog_input) const
/// proton: ends
{
Expand Down Expand Up @@ -202,7 +205,8 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
}

/// proton: starts. Check user defined aggr function
auto aggr = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, is_changelog_input);
auto aggr
= UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, context, is_changelog_input);
if (aggr)
return aggr;
/// proton: ends
Expand All @@ -225,12 +229,13 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context,
bool is_changelog_input) const
/// proton: ends
{
return isAggregateFunctionName(name)
/// proton: starts
? get(name, argument_types, parameters, out_properties, is_changelog_input)
? get(name, argument_types, parameters, out_properties, context, is_changelog_input)
/// proton: ends
: nullptr;
}
Expand Down
3 changes: 3 additions & 0 deletions src/AggregateFunctions/AggregateFunctionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context = nullptr,
bool is_changelog_input = false) const;

/// Returns nullptr if not found.
Expand All @@ -77,6 +78,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context = nullptr,
bool is_changelog_input = false) const;
/// proton: ends

Expand All @@ -97,6 +99,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
const Array & parameters,
AggregateFunctionProperties & out_properties,
bool has_null_arguments,
ContextPtr context,
bool is_changelog_input = false) const;
/// proton: ends

Expand Down
11 changes: 11 additions & 0 deletions src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <Functions/UserDefined/UserDefinedFunctionConfiguration.h>
#include <V8/ConvertDataTypes.h>
#include <V8/Utils.h>
#include <base/getMemoryAmount.h>
#include <Common/logger_useful.h>

#include <span>

namespace DB
Expand All @@ -23,10 +25,17 @@ JavaScriptBlueprint::JavaScriptBlueprint(const String & name, const String & sou
{
/// FIXME, create isolate from V8::V8 global isolates pool
v8::Isolate::CreateParams isolate_params;
size_t max_heap_size_in_bytes = static_cast<size_t>(getMemoryAmountOrZeroCached() * 0.6);
size_t max_old_gen_size_in_bytes = static_cast<size_t>(getMemoryAmountOrZeroCached() * 0.6);

isolate_params.constraints.ConfigureDefaultsFromHeapSize(0, max_heap_size_in_bytes);
isolate_params.constraints.set_max_old_generation_size_in_bytes(max_old_gen_size_in_bytes);

isolate_params.array_buffer_allocator_shared
= std::shared_ptr<v8::ArrayBuffer::Allocator>(v8::ArrayBuffer::Allocator::NewDefaultAllocator());
isolate = std::unique_ptr<v8::Isolate, IsolateDeleter>(v8::Isolate::New(isolate_params), IsolateDeleter());


auto * logger = &Poco::Logger::get("JavaScriptAggregateFunction");

/// Analyze if this UDA's definition to initialize the blueprint
Expand Down Expand Up @@ -275,7 +284,9 @@ AggregateFunctionJavaScriptAdapter::AggregateFunctionJavaScriptAdapter(
, is_changelog_input(is_changelog_input_)
, max_v8_heap_size_in_bytes(max_v8_heap_size_in_bytes_)
, blueprint(config->name, config->source)
, logger(&Poco::Logger::get("JavaScriptAggregateFunction"))
{
LOG_INFO(logger, "udf name={}, javascript_max_memory_bytes={}", config->name, max_v8_heap_size_in_bytes);
}

String AggregateFunctionJavaScriptAdapter::getName() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
bool is_changelog_input = false;
size_t max_v8_heap_size_in_bytes;
JavaScriptBlueprint blueprint;
Poco::Logger * logger;

public:
AggregateFunctionJavaScriptAdapter(
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, aysnc_ingest_max_outstanding_blocks, 10000, "Max outstanding blocks to be committed during async ingestion", 0) \
M(Bool, _tp_internal_system_open_sesame, true, "Control the access to system.* streams", 0) \
M(Bool, is_internal, false, "Control the statistics of select query", 0) \
M(UInt64, javascript_max_memory_bytes, 100 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \
M(UInt64, javascript_max_memory_bytes, 200 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \
M(Bool, enable_dependency_check, true, "Enable the dependency check of view/materialized view", 0) \
M(RecoveryPolicy, recovery_policy, RecoveryPolicy::Strict, "Default recovery policy for materialized view when inner query failed. 'strict': always recover from checkpointed; 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \
M(UInt64, recovery_retry_for_sn_failure, 3, "Default retry times for sn failure. only apply for 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \
Expand Down
3 changes: 3 additions & 0 deletions src/Functions/UserDefined/JavaScriptUserDefinedFunction.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include <Functions/UserDefined/JavaScriptUserDefinedFunction.h>

#include <V8/ConvertDataTypes.h>
#include <V8/Utils.h>
#include <base/getMemoryAmount.h>

#include <span>

namespace DB
Expand Down
23 changes: 10 additions & 13 deletions src/Functions/UserDefined/UserDefinedFunctionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ extern const int UNKNOWN_FUNCTION;
/// proton: ends
}

UserDefinedFunctionFactory::UserDefinedFunctionFactory() : logger(&Poco::Logger::get("UserDefinedFunctionFactory"))
{
}

UserDefinedFunctionFactory & UserDefinedFunctionFactory::instance()
{
static UserDefinedFunctionFactory result;
Expand Down Expand Up @@ -68,6 +72,7 @@ AggregateFunctionPtr UserDefinedFunctionFactory::getAggregateFunction(
const DataTypes & types,
const Array & parameters,
AggregateFunctionProperties & /*properties*/,
ContextPtr context,
bool is_changelog_input)
{
const auto & loader = ExternalUserDefinedFunctionsLoader::instance(nullptr);
Expand Down Expand Up @@ -107,18 +112,14 @@ AggregateFunctionPtr UserDefinedFunctionFactory::getAggregateFunction(
size_t num_of_args = config->arguments.size();
validate_arguments(types.back()->getName() == "int8" ? num_of_args : num_of_args - 1);

ContextPtr query_context;
if (CurrentThread::isInitialized())
query_context = CurrentThread::get().getQueryContext();

if (!query_context || !query_context->getSettingsRef().javascript_max_memory_bytes)
if (!context || !context->getSettingsRef().javascript_max_memory_bytes)
{
LOG_ERROR(&Poco::Logger::get("UserDefinedFunctionFactory"), "query_context is invalid");
LOG_ERROR(instance().getLogger(), "query_context is invalid");
return nullptr;
}

return std::make_shared<AggregateFunctionJavaScriptAdapter>(
config, types, parameters, is_changelog_input, query_context->getSettingsRef().javascript_max_memory_bytes);
config, types, parameters, is_changelog_input, context->getSettingsRef().javascript_max_memory_bytes);
}

return nullptr;
Expand Down Expand Up @@ -160,7 +161,7 @@ FunctionOverloadResolverPtr UserDefinedFunctionFactory::tryGet(const String & fu
/// proton: starts
try
{
return get(function_name,std::move(context));
return get(function_name, std::move(context));
}
catch (Exception &)
{
Expand Down Expand Up @@ -196,11 +197,7 @@ std::vector<String> UserDefinedFunctionFactory::getRegisteredNames(ContextPtr co

/// proton: starts
bool UserDefinedFunctionFactory::registerFunction(
ContextPtr context,
const String & function_name,
Poco::JSON::Object::Ptr json_func,
bool throw_if_exists,
bool replace_if_exists)
ContextPtr context, const String & function_name, Poco::JSON::Object::Ptr json_func, bool throw_if_exists, bool replace_if_exists)
{
Streaming::validateUDFName(function_name);

Expand Down
7 changes: 7 additions & 0 deletions src/Functions/UserDefined/UserDefinedFunctionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class UserDefinedFunctionFactory
const DataTypes & types,
const Array & parameters,
AggregateFunctionProperties & properties,
ContextPtr context,
/// whether input of aggregation function is changelog, aggregate function does not pass _tp_delta column to UDA if it is false
bool is_changelog_input = false);

Expand All @@ -53,6 +54,12 @@ class UserDefinedFunctionFactory
static bool has(const String & function_name, ContextPtr context);

static std::vector<String> getRegisteredNames(ContextPtr context);

Poco::Logger * getLogger() const { return logger; }

private:
UserDefinedFunctionFactory();
Poco::Logger * logger;
};

}
4 changes: 2 additions & 2 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ AggregateFunctionPtr getAggregateFunction(
/// Examples: Translate `quantile(x, 0.5)` to `quantile(0.5)(x)`
tryTranslateToParametricAggregateFunction(node, types, parameters, argument_names, context);
if (throw_if_empty)
return AggregateFunctionFactory::instance().get(node->name, types, parameters, properties, is_changelog_input);
return AggregateFunctionFactory::instance().get(node->name, types, parameters, properties, context, is_changelog_input);
else
return AggregateFunctionFactory::instance().tryGet(node->name, types, parameters, properties, is_changelog_input);
return AggregateFunctionFactory::instance().tryGet(node->name, types, parameters, properties, context, is_changelog_input);
}
/// proton: ends.
}
Expand Down
51 changes: 44 additions & 7 deletions src/V8/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ void validateFunctionSource(
std::function<void(v8::Isolate *, v8::Local<v8::Context> &, v8::TryCatch &, v8::Local<v8::Value> &)> func)
{
/// FIXME, switch to global isolate allocation / pooling
UInt64 max_heap_size_in_bytes = 10 * 1024 * 1024;
UInt64 max_old_gen_size_in_bytes = 8 * 1024 * 1024;
UInt64 max_heap_size_in_bytes = static_cast<size_t>(getMemoryAmountOrZeroCached() * 0.6);
UInt64 max_old_gen_size_in_bytes = static_cast<size_t>(getMemoryAmountOrZeroCached() * 0.6);

v8::Isolate::CreateParams isolate_params;
isolate_params.array_buffer_allocator_shared
Expand Down Expand Up @@ -431,26 +431,63 @@ void validateStatelessFunctionSource(const std::string & func_name, const std::s
validateFunctionSource(func_name, source, validate_function);
}

std::string getHeapStatisticsString(v8::HeapStatistics & heap_statistics)
{
return fmt::format(
"total_heap_size: {}\t"
"total_heap_size_executable: {}\t"
"total_physical_size: {}\t"
"total_available_size: {}\t"
"used_heap_size: {}\t"
"heap_size_limit: {}\t"
"malloced_memory: {}\t"
"external_memory: {}\t"
"peak_malloced_memory: {}\t"
"does_zap_garbage: {}\t"
"number_of_native_contexts: {}\t"
"number_of_detached_contexts: {}\t"
"total_global_handles_size: {}\t"
"used_global_handles_size: {}",
heap_statistics.total_heap_size(),
heap_statistics.total_heap_size_executable(),
heap_statistics.total_physical_size(),
heap_statistics.total_available_size(),
heap_statistics.used_heap_size(),
heap_statistics.heap_size_limit(),
heap_statistics.malloced_memory(),
heap_statistics.external_memory(),
heap_statistics.peak_malloced_memory(),
heap_statistics.does_zap_garbage(),
heap_statistics.number_of_native_contexts(),
heap_statistics.number_of_detached_contexts(),
heap_statistics.total_global_handles_size(),
heap_statistics.used_global_handles_size());
}

void checkHeapLimit(v8::Isolate * isolate, size_t max_v8_heap_size_in_bytes)
{
v8::Locker locker(isolate);
v8::Isolate::Scope isolate_scope(isolate);
v8::HandleScope handle_scope(isolate);
v8::HandleScope scope(isolate);
/// TODO(qijun): add v8 heap stat metrics to system profile event capture
v8::HeapStatistics heap_statistics;
isolate->GetHeapStatistics(&heap_statistics);

auto used = heap_statistics.used_heap_size();
auto total = heap_statistics.total_available_size();
auto limit = std::min(static_cast<size_t>(0.9 * total), max_v8_heap_size_in_bytes);
size_t used = heap_statistics.used_heap_size();
size_t total = heap_statistics.heap_size_limit();
size_t limit = std::min(total, max_v8_heap_size_in_bytes);

if (used > limit)
throw Exception(
ErrorCodes::UDF_MEMORY_THRESHOLD_EXCEEDED,
"Current V8 heap size used={} bytes, total={} bytes, javascript_max_memory_bytes={}, exceed the limit={} bytes",
"Current V8 heap size used={} bytes, total={} bytes, javascript_max_memory_bytes={}, exceed the limit={} bytes, v8 heap "
"stat={{{}}}",
used,
total,
max_v8_heap_size_in_bytes,
limit);
limit,
V8::getHeapStatisticsString(heap_statistics));
}
}
}
2 changes: 2 additions & 0 deletions src/V8/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,7 @@ void validateStatelessFunctionSource(const std::string & func_name, const std::s

/// Check v8 heap size and throw exception if exceeds limit
void checkHeapLimit(v8::Isolate * isolate, size_t max_v8_heap_size_in_bytes);

std::string getHeapStatisticsString(v8::HeapStatistics & heap_statistics);
}
}
Loading
Loading