diff --git a/base/base/getMemoryAmount.cpp b/base/base/getMemoryAmount.cpp index a46e964c5a3..87cd537f663 100644 --- a/base/base/getMemoryAmount.cpp +++ b/base/base/getMemoryAmount.cpp @@ -56,7 +56,6 @@ uint64_t getMemoryAmountOrZero() } - uint64_t getMemoryAmount() { auto res = getMemoryAmountOrZero(); @@ -64,3 +63,9 @@ uint64_t getMemoryAmount() throw std::runtime_error("Cannot determine memory amount"); return res; } + +uint64_t getMemoryAmountOrZeroCached() +{ + static auto res = getMemoryAmountOrZero(); + return res; +} diff --git a/base/base/getMemoryAmount.h b/base/base/getMemoryAmount.h index 7ebd92a8bcf..af1b5718efc 100644 --- a/base/base/getMemoryAmount.h +++ b/base/base/getMemoryAmount.h @@ -10,3 +10,5 @@ uint64_t getMemoryAmountOrZero(); /** Throws exception if it cannot determine the size of physical memory. */ uint64_t getMemoryAmount(); + +uint64_t getMemoryAmountOrZeroCached(); diff --git a/programs/server/config.yaml b/programs/server/config.yaml index ab9cf8bc33a..eb9a141c81b 100644 --- a/programs/server/config.yaml +++ b/programs/server/config.yaml @@ -980,7 +980,7 @@ settings: part_commit_pool_size: 8 # Total shared thread pool size for building and committing parts for Stream max_idempotent_ids: 1000 # Maximum idempotent IDs to keep in memory and on disk for idempotent data ingestion _tp_internal_system_open_sesame: true # Control the access to system.* streams - javascript_max_memory_bytes: 104857600 #Maximum heap size of javascript UDA/UDF in bytes, default is 100*1024*1024 bytes + javascript_max_memory_bytes: 204857600 #Maximum heap size of javascript UDA/UDF in bytes, default is 200 MByte recovery_policy: "strict" # Recovery policy for materialized view. strict or best_effort recovery_retry_for_sn_failure: 3 # retry times for sn failure. this value only apply if the `recovery_policy` is `best_effort` max_block_size: 65409 # 65536 - (PADDING_FOR_SIMD - 1) diff --git a/programs/server/embedded.xml b/programs/server/embedded.xml index c7a3b39e212..f45a64d0cb3 100644 --- a/programs/server/embedded.xml +++ b/programs/server/embedded.xml @@ -141,7 +141,7 @@ 8 1000 <_tp_internal_system_open_sesame>true - 104857600 + 204857600 1 diff --git a/src/AggregateFunctions/AggregateFunctionFactory.cpp b/src/AggregateFunctions/AggregateFunctionFactory.cpp index ee6e212fe54..b098c96958d 100644 --- a/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -76,6 +76,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get( const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties, + ContextPtr context, bool is_changelog_input) const /// proton: ends { @@ -97,8 +98,8 @@ AggregateFunctionPtr AggregateFunctionFactory::get( bool has_null_arguments = std::any_of(types_without_low_cardinality.begin(), types_without_low_cardinality.end(), [](const auto & type) { return type->onlyNull(); }); - AggregateFunctionPtr nested_function = getImpl( - name, nested_types, nested_parameters, out_properties, has_null_arguments, is_changelog_input); + AggregateFunctionPtr nested_function + = getImpl(name, nested_types, nested_parameters, out_properties, has_null_arguments, context, is_changelog_input); // Pure window functions are not real aggregate functions. Applying // combinators doesn't make sense for them, they must handle the @@ -109,7 +110,8 @@ AggregateFunctionPtr AggregateFunctionFactory::get( return combinator->transformAggregateFunction(nested_function, out_properties, types_without_low_cardinality, parameters); } - auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false, is_changelog_input); + auto with_original_arguments + = getImpl(name, types_without_low_cardinality, parameters, out_properties, false, context, is_changelog_input); if (!with_original_arguments) throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR); @@ -123,6 +125,7 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl( const Array & parameters, AggregateFunctionProperties & out_properties, bool has_null_arguments, + ContextPtr context, bool is_changelog_input) const /// proton: ends { @@ -202,7 +205,8 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl( } /// proton: starts. Check user defined aggr function - auto aggr = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, is_changelog_input); + auto aggr + = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, context, is_changelog_input); if (aggr) return aggr; /// proton: ends @@ -225,12 +229,13 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet( const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties, + ContextPtr context, bool is_changelog_input) const /// proton: ends { return isAggregateFunctionName(name) /// proton: starts - ? get(name, argument_types, parameters, out_properties, is_changelog_input) + ? get(name, argument_types, parameters, out_properties, context, is_changelog_input) /// proton: ends : nullptr; } diff --git a/src/AggregateFunctions/AggregateFunctionFactory.h b/src/AggregateFunctions/AggregateFunctionFactory.h index cef128a0878..fc281e1e15a 100644 --- a/src/AggregateFunctions/AggregateFunctionFactory.h +++ b/src/AggregateFunctions/AggregateFunctionFactory.h @@ -69,6 +69,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties, + ContextPtr context = nullptr, bool is_changelog_input = false) const; /// Returns nullptr if not found. @@ -77,6 +78,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties, + ContextPtr context = nullptr, bool is_changelog_input = false) const; /// proton: ends @@ -97,6 +99,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto const Array & parameters, AggregateFunctionProperties & out_properties, bool has_null_arguments, + ContextPtr context, bool is_changelog_input = false) const; /// proton: ends diff --git a/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp b/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp index fd09a57f4a9..096b1cac55a 100644 --- a/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp +++ b/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp @@ -5,7 +5,9 @@ #include #include #include +#include #include + #include namespace DB @@ -23,10 +25,17 @@ JavaScriptBlueprint::JavaScriptBlueprint(const String & name, const String & sou { /// FIXME, create isolate from V8::V8 global isolates pool v8::Isolate::CreateParams isolate_params; + size_t max_heap_size_in_bytes = static_cast(getMemoryAmountOrZeroCached() * 0.6); + size_t max_old_gen_size_in_bytes = static_cast(getMemoryAmountOrZeroCached() * 0.6); + + isolate_params.constraints.ConfigureDefaultsFromHeapSize(0, max_heap_size_in_bytes); + isolate_params.constraints.set_max_old_generation_size_in_bytes(max_old_gen_size_in_bytes); + isolate_params.array_buffer_allocator_shared = std::shared_ptr(v8::ArrayBuffer::Allocator::NewDefaultAllocator()); isolate = std::unique_ptr(v8::Isolate::New(isolate_params), IsolateDeleter()); + auto * logger = &Poco::Logger::get("JavaScriptAggregateFunction"); /// Analyze if this UDA's definition to initialize the blueprint @@ -275,7 +284,9 @@ AggregateFunctionJavaScriptAdapter::AggregateFunctionJavaScriptAdapter( , is_changelog_input(is_changelog_input_) , max_v8_heap_size_in_bytes(max_v8_heap_size_in_bytes_) , blueprint(config->name, config->source) + , logger(&Poco::Logger::get("JavaScriptAggregateFunction")) { + LOG_INFO(logger, "udf name={}, javascript_max_memory_bytes={}", config->name, max_v8_heap_size_in_bytes); } String AggregateFunctionJavaScriptAdapter::getName() const diff --git a/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h b/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h index 86f0e493322..79f25ee8aa2 100644 --- a/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h +++ b/src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h @@ -91,6 +91,7 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper bool is_changelog_input = false; size_t max_v8_heap_size_in_bytes; JavaScriptBlueprint blueprint; + Poco::Logger * logger; public: AggregateFunctionJavaScriptAdapter( diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 8b1a5d1eda0..7040afeb853 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -845,7 +845,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, aysnc_ingest_max_outstanding_blocks, 10000, "Max outstanding blocks to be committed during async ingestion", 0) \ M(Bool, _tp_internal_system_open_sesame, true, "Control the access to system.* streams", 0) \ M(Bool, is_internal, false, "Control the statistics of select query", 0) \ - M(UInt64, javascript_max_memory_bytes, 100 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \ + M(UInt64, javascript_max_memory_bytes, 200 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \ M(Bool, enable_dependency_check, true, "Enable the dependency check of view/materialized view", 0) \ M(RecoveryPolicy, recovery_policy, RecoveryPolicy::Strict, "Default recovery policy for materialized view when inner query failed. 'strict': always recover from checkpointed; 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \ M(UInt64, recovery_retry_for_sn_failure, 3, "Default retry times for sn failure. only apply for 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \ diff --git a/src/Functions/UserDefined/JavaScriptUserDefinedFunction.cpp b/src/Functions/UserDefined/JavaScriptUserDefinedFunction.cpp index ea197fde3dc..01ca3d7b4d6 100644 --- a/src/Functions/UserDefined/JavaScriptUserDefinedFunction.cpp +++ b/src/Functions/UserDefined/JavaScriptUserDefinedFunction.cpp @@ -1,6 +1,9 @@ #include + #include #include +#include + #include namespace DB diff --git a/src/Functions/UserDefined/UserDefinedFunctionFactory.cpp b/src/Functions/UserDefined/UserDefinedFunctionFactory.cpp index 457946306bb..42f595e3070 100644 --- a/src/Functions/UserDefined/UserDefinedFunctionFactory.cpp +++ b/src/Functions/UserDefined/UserDefinedFunctionFactory.cpp @@ -32,6 +32,10 @@ extern const int UNKNOWN_FUNCTION; /// proton: ends } +UserDefinedFunctionFactory::UserDefinedFunctionFactory() : logger(&Poco::Logger::get("UserDefinedFunctionFactory")) +{ +} + UserDefinedFunctionFactory & UserDefinedFunctionFactory::instance() { static UserDefinedFunctionFactory result; @@ -68,6 +72,7 @@ AggregateFunctionPtr UserDefinedFunctionFactory::getAggregateFunction( const DataTypes & types, const Array & parameters, AggregateFunctionProperties & /*properties*/, + ContextPtr context, bool is_changelog_input) { const auto & loader = ExternalUserDefinedFunctionsLoader::instance(nullptr); @@ -107,18 +112,14 @@ AggregateFunctionPtr UserDefinedFunctionFactory::getAggregateFunction( size_t num_of_args = config->arguments.size(); validate_arguments(types.back()->getName() == "int8" ? num_of_args : num_of_args - 1); - ContextPtr query_context; - if (CurrentThread::isInitialized()) - query_context = CurrentThread::get().getQueryContext(); - - if (!query_context || !query_context->getSettingsRef().javascript_max_memory_bytes) + if (!context || !context->getSettingsRef().javascript_max_memory_bytes) { - LOG_ERROR(&Poco::Logger::get("UserDefinedFunctionFactory"), "query_context is invalid"); + LOG_ERROR(instance().getLogger(), "query_context is invalid"); return nullptr; } return std::make_shared( - config, types, parameters, is_changelog_input, query_context->getSettingsRef().javascript_max_memory_bytes); + config, types, parameters, is_changelog_input, context->getSettingsRef().javascript_max_memory_bytes); } return nullptr; @@ -160,7 +161,7 @@ FunctionOverloadResolverPtr UserDefinedFunctionFactory::tryGet(const String & fu /// proton: starts try { - return get(function_name,std::move(context)); + return get(function_name, std::move(context)); } catch (Exception &) { @@ -196,11 +197,7 @@ std::vector UserDefinedFunctionFactory::getRegisteredNames(ContextPtr co /// proton: starts bool UserDefinedFunctionFactory::registerFunction( - ContextPtr context, - const String & function_name, - Poco::JSON::Object::Ptr json_func, - bool throw_if_exists, - bool replace_if_exists) + ContextPtr context, const String & function_name, Poco::JSON::Object::Ptr json_func, bool throw_if_exists, bool replace_if_exists) { Streaming::validateUDFName(function_name); diff --git a/src/Functions/UserDefined/UserDefinedFunctionFactory.h b/src/Functions/UserDefined/UserDefinedFunctionFactory.h index a8d040feb93..31f5f95e0bb 100644 --- a/src/Functions/UserDefined/UserDefinedFunctionFactory.h +++ b/src/Functions/UserDefined/UserDefinedFunctionFactory.h @@ -31,6 +31,7 @@ class UserDefinedFunctionFactory const DataTypes & types, const Array & parameters, AggregateFunctionProperties & properties, + ContextPtr context, /// whether input of aggregation function is changelog, aggregate function does not pass _tp_delta column to UDA if it is false bool is_changelog_input = false); @@ -53,6 +54,12 @@ class UserDefinedFunctionFactory static bool has(const String & function_name, ContextPtr context); static std::vector getRegisteredNames(ContextPtr context); + + Poco::Logger * getLogger() const { return logger; } + +private: + UserDefinedFunctionFactory(); + Poco::Logger * logger; }; } diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index e125975bca1..c0953576b7d 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -281,9 +281,9 @@ AggregateFunctionPtr getAggregateFunction( /// Examples: Translate `quantile(x, 0.5)` to `quantile(0.5)(x)` tryTranslateToParametricAggregateFunction(node, types, parameters, argument_names, context); if (throw_if_empty) - return AggregateFunctionFactory::instance().get(node->name, types, parameters, properties, is_changelog_input); + return AggregateFunctionFactory::instance().get(node->name, types, parameters, properties, context, is_changelog_input); else - return AggregateFunctionFactory::instance().tryGet(node->name, types, parameters, properties, is_changelog_input); + return AggregateFunctionFactory::instance().tryGet(node->name, types, parameters, properties, context, is_changelog_input); } /// proton: ends. } diff --git a/src/V8/Utils.cpp b/src/V8/Utils.cpp index 100eef47e5a..eb5cdd9b687 100644 --- a/src/V8/Utils.cpp +++ b/src/V8/Utils.cpp @@ -340,8 +340,8 @@ void validateFunctionSource( std::function &, v8::TryCatch &, v8::Local &)> func) { /// FIXME, switch to global isolate allocation / pooling - UInt64 max_heap_size_in_bytes = 10 * 1024 * 1024; - UInt64 max_old_gen_size_in_bytes = 8 * 1024 * 1024; + UInt64 max_heap_size_in_bytes = static_cast(getMemoryAmountOrZeroCached() * 0.6); + UInt64 max_old_gen_size_in_bytes = static_cast(getMemoryAmountOrZeroCached() * 0.6); v8::Isolate::CreateParams isolate_params; isolate_params.array_buffer_allocator_shared @@ -431,26 +431,63 @@ void validateStatelessFunctionSource(const std::string & func_name, const std::s validateFunctionSource(func_name, source, validate_function); } +std::string getHeapStatisticsString(v8::HeapStatistics & heap_statistics) +{ + return fmt::format( + "total_heap_size: {}\t" + "total_heap_size_executable: {}\t" + "total_physical_size: {}\t" + "total_available_size: {}\t" + "used_heap_size: {}\t" + "heap_size_limit: {}\t" + "malloced_memory: {}\t" + "external_memory: {}\t" + "peak_malloced_memory: {}\t" + "does_zap_garbage: {}\t" + "number_of_native_contexts: {}\t" + "number_of_detached_contexts: {}\t" + "total_global_handles_size: {}\t" + "used_global_handles_size: {}", + heap_statistics.total_heap_size(), + heap_statistics.total_heap_size_executable(), + heap_statistics.total_physical_size(), + heap_statistics.total_available_size(), + heap_statistics.used_heap_size(), + heap_statistics.heap_size_limit(), + heap_statistics.malloced_memory(), + heap_statistics.external_memory(), + heap_statistics.peak_malloced_memory(), + heap_statistics.does_zap_garbage(), + heap_statistics.number_of_native_contexts(), + heap_statistics.number_of_detached_contexts(), + heap_statistics.total_global_handles_size(), + heap_statistics.used_global_handles_size()); +} + void checkHeapLimit(v8::Isolate * isolate, size_t max_v8_heap_size_in_bytes) { v8::Locker locker(isolate); v8::Isolate::Scope isolate_scope(isolate); v8::HandleScope handle_scope(isolate); v8::HandleScope scope(isolate); + /// TODO(qijun): add v8 heap stat metrics to system profile event capture v8::HeapStatistics heap_statistics; isolate->GetHeapStatistics(&heap_statistics); - auto used = heap_statistics.used_heap_size(); - auto total = heap_statistics.total_available_size(); - auto limit = std::min(static_cast(0.9 * total), max_v8_heap_size_in_bytes); + size_t used = heap_statistics.used_heap_size(); + size_t total = heap_statistics.heap_size_limit(); + size_t limit = std::min(total, max_v8_heap_size_in_bytes); + if (used > limit) throw Exception( ErrorCodes::UDF_MEMORY_THRESHOLD_EXCEEDED, - "Current V8 heap size used={} bytes, total={} bytes, javascript_max_memory_bytes={}, exceed the limit={} bytes", + "Current V8 heap size used={} bytes, total={} bytes, javascript_max_memory_bytes={}, exceed the limit={} bytes, v8 heap " + "stat={{{}}}", used, total, max_v8_heap_size_in_bytes, - limit); + limit, + V8::getHeapStatisticsString(heap_statistics)); } } } diff --git a/src/V8/Utils.h b/src/V8/Utils.h index 7a956735407..0407cf5dfff 100644 --- a/src/V8/Utils.h +++ b/src/V8/Utils.h @@ -72,5 +72,7 @@ void validateStatelessFunctionSource(const std::string & func_name, const std::s /// Check v8 heap size and throw exception if exceeds limit void checkHeapLimit(v8::Isolate * isolate, size_t max_v8_heap_size_in_bytes); + +std::string getHeapStatisticsString(v8::HeapStatistics & heap_statistics); } } diff --git a/src/V8/V8.cpp b/src/V8/V8.cpp index e661a194df5..32c024af9ac 100644 --- a/src/V8/V8.cpp +++ b/src/V8/V8.cpp @@ -1,5 +1,6 @@ #include #include +#include #include namespace DB @@ -87,7 +88,17 @@ v8::Isolate * V8::createIsolate() isolate_params.array_buffer_allocator = allocator.get(); if (v8_max_heap_bytes > 0) + { isolate_params.constraints.set_max_old_generation_size_in_bytes(v8_max_heap_bytes); + } + else + { + size_t max_heap_size_in_bytes = static_cast(getMemoryAmountOrZeroCached() * 0.6); + size_t max_old_gen_size_in_bytes = static_cast(getMemoryAmountOrZeroCached() * 0.6); + + isolate_params.constraints.ConfigureDefaultsFromHeapSize(0, max_heap_size_in_bytes); + isolate_params.constraints.set_max_old_generation_size_in_bytes(max_old_gen_size_in_bytes); + } auto * isolate = v8::Isolate::New(isolate_params); assert(isolate); diff --git a/tests/queries_ported/0_stateless/99010_javascript_max_memory_settings.reference b/tests/queries_ported/0_stateless/99010_javascript_max_memory_settings.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries_ported/0_stateless/99010_javascript_max_memory_settings.sql b/tests/queries_ported/0_stateless/99010_javascript_max_memory_settings.sql new file mode 100644 index 00000000000..e743238ed2c --- /dev/null +++ b/tests/queries_ported/0_stateless/99010_javascript_max_memory_settings.sql @@ -0,0 +1,65 @@ +CREATE STREAM IF NOT EXISTS 99010_udf_types(`f32` float); + +CREATE AGGREGATE FUNCTION test_sec_large_99010(value float32) RETURNS float32 LANGUAGE JAVASCRIPT AS $$ + { + initialize: function() { + this.max = -1.0; + this.sec = -1.0 + }, + process: function(values) { + for (let i = 0; i < values.length; i++) { + if (values[i] > this.max) { + this.sec = this.max; + this.max = values[i] + } + if (values[i] < this.max && values[i] > this.sec) + this.sec = values[i]; + } + }, + finalize: function() { + return this.sec + }, + serialize: function() { + let s = { + 'max': this.max, + 'sec': this.sec + }; + return JSON.stringify(s) + }, + deserialize: function(state_str) { + let s = JSON.parse(state_str); + this.max = s['max']; + this.sec = s['sec'] + }, + merge: function(state_str) { + let s = JSON.parse(state_str); + if (s['sec'] >= this.max) { + this.max = s['max']; + this.sec = s['sec'] + } else if (s['max'] >= this.max) { + this.sec = this.max; + this.max = s['max'] + } else if (s['max'] > this.sec) { + this.sec = s['max'] + } + } + } + $$; + +select sleep(1) FORMAT Null; +insert into 99010_udf_types(f32) values(2.0); +select sleep(1) FORMAT Null; +insert into 99010_udf_types(f32) values(2.0); +select sleep(1) FORMAT Null; +insert into 99010_udf_types(f32) values(2.0); +select sleep(1) FORMAT Null; + + +select test_sec_large_99010(f32) from table(99010_udf_types) settings javascript_max_memory_bytes=2; --- { serverError UDF_MEMORY_THRESHOLD_EXCEEDED } + +with acc as (select test_sec_large_99010(f32) as res from table(99010_udf_types) settings javascript_max_memory_bytes=2) select min(res) as re from acc; --- { serverError UDF_MEMORY_THRESHOLD_EXCEEDED } + + + +DROP STREAM 99010_udf_types; +DROP FUNCTION test_sec_large_99010;