Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merge tree dml #137

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ namespace DB
M(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means unlimited.", 0) \
M(UInt64, concurrent_threads_soft_limit_ratio_to_cores, 0, "Same as concurrent_threads_soft_limit_num, but with ratio to cores.", 0) \
\
M(UInt64, background_pool_size, 0, "The maximum number of threads what will be used for merging or mutating data parts for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_pool_size, 2, "The maximum number of threads what will be used for merging or mutating data parts for *MergeTree-engine tables in a background.", 0) \
M(Float, background_merges_mutations_concurrency_ratio, 2, "The number of part mutation tasks that can be executed concurrently by each thread in background pool.", 0) \
M(String, background_merges_mutations_scheduling_policy, "round_robin", "The policy on how to perform a scheduling for background merges and mutations. Possible values are: `round_robin` and `shortest_task_first`. ", 0) \
M(UInt64, background_move_pool_size, 0, "The maximum number of threads that will be used for moving data parts to another disk or volume for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_fetches_pool_size, 0, "The maximum number of threads that will be used for fetching data parts from another replica for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_common_pool_size, 0, "The maximum number of threads that will be used for performing a variety of operations (mostly garbage collection) for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_buffer_flush_schedule_pool_size, 0, "The maximum number of threads that will be used for performing flush operations for Buffer-engine tables in a background.", 0) \
M(UInt64, background_schedule_pool_size, 0, "The maximum number of threads that will be used for constantly executing some lightweight periodic operations.", 0) \
M(UInt64, background_move_pool_size, 1, "The maximum number of threads that will be used for moving data parts to another disk or volume for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_fetches_pool_size, 1, "The maximum number of threads that will be used for fetching data parts from another replica for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_common_pool_size, 1, "The maximum number of threads that will be used for performing a variety of operations (mostly garbage collection) for *MergeTree-engine tables in a background.", 0) \
M(UInt64, background_buffer_flush_schedule_pool_size, 2, "The maximum number of threads that will be used for performing flush operations for Buffer-engine tables in a background.", 0) \
M(UInt64, background_schedule_pool_size, 1, "The maximum number of threads that will be used for constantly executing some lightweight periodic operations.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 0, "The maximum number of threads that will be used for executing background operations for message streaming.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 0, "The maximum number of threads that will be used for executing distributed sends.", 0) \
M(Bool, display_secrets_in_show_and_select, false, "Allow showing secrets in SHOW and SELECT queries via a format setting and a grant", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ class IColumn;
M(Bool, optimize_count_from_files, true, "Optimize counting rows from files in supported input formats", 0) \
M(Bool, use_cache_for_count_from_files, true, "Use cache to count the number of rows in files", 0) \
M(Bool, optimize_respect_aliases, true, "If it is set to true, it will respect aliases in WHERE/GROUP BY/ORDER BY, that will help with partition pruning/secondary indexes/optimize_aggregation_in_order/optimize_read_in_order/optimize_trivial_count", 0) \
M(UInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
M(UInt64, mutations_sync, 1, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
M(Bool, enable_lightweight_delete, true, "Enable lightweight DELETE mutations for mergetree tables.", 0) ALIAS(allow_experimental_lightweight_delete) \
M(Bool, apply_deleted_mask, true, "Enables filtering out rows deleted with lightweight DELETE. If disabled, a query will be able to read those rows. This is useful for debugging and \"undelete\" scenarios", 0) \
M(Bool, optimize_move_functions_out_of_any, false, "Move functions out of aggregate functions 'any', 'anyLast'.", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ struct SharedPtrContextHolder
{
~SharedPtrContextHolder();
SharedPtrContextHolder();
SharedPtrContextHolder(ContextSharedPart *);
explicit SharedPtrContextHolder(ContextSharedPart *);
SharedPtrContextHolder(SharedPtrContextHolder &&) noexcept;

SharedPtrContextHolder & operator=(SharedPtrContextHolder &&) noexcept;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ MergeTreeBackgroundExecutor<Queue>::MergeTreeBackgroundExecutor(
active.set_capacity(max_tasks_count);

pool->setMaxThreads(std::max(1UL, threads_count));
pool->setMaxFreeThreads(std::max(1UL, threads_count));
pool->setMaxFreeThreads(0);
pool->setQueueSize(std::max(1UL, threads_count));

for (size_t number = 0; number < threads_count; ++number)
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct Settings;
M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 0, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(UInt64, max_number_of_mutations_for_replica, 0, "Limit the number of part mutations per replica to the specified amount. Zero means no limit on the number of mutations per replica (the execution can still be constrained by other settings).", 0) \
M(UInt64, max_number_of_merges_with_ttl_in_pool, 2, "When there is more than specified number of merges with TTL entries in pool, do not assign new merge with TTL. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(Seconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \
M(Seconds, old_parts_lifetime, 0, "How many seconds to keep obsolete parts.", 0) \
M(Seconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories. You should not lower this value because merges and mutations may not be able to work with low value of this setting.", 0) \
M(Seconds, lock_acquire_timeout_for_background_operations, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "For background operations like merges, mutations etc. How many seconds before failing to acquire table locks.", 0) \
M(UInt64, min_rows_to_fsync_after_merge, 0, "Minimal number of rows to do fsync for part after merge (0 - disabled)", 0) \
Expand Down
98 changes: 98 additions & 0 deletions tests/test_issue104.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!python3

import os
import time
import shutil
import unittest
import psutil
from chdb import session as chs

tmp_dir = ".state_tmp_auxten_issue104"


class TestIssue104(unittest.TestCase):
def setUp(self) -> None:
# shutil.rmtree(tmp_dir, ignore_errors=True)
return super().setUp()

def tearDown(self):
# shutil.rmtree(tmp_dir, ignore_errors=True)
return super().tearDown()

def test_issue104(self):
sess = chs.Session(tmp_dir)

sess.query("CREATE DATABASE IF NOT EXISTS test_db ENGINE = Atomic;")
# sess.query("CREATE DATABASE IF NOT EXISTS test_db ENGINE = Atomic;", "Debug")
sess.query("CREATE TABLE IF NOT EXISTS test_db.test_table (x String, y String) ENGINE = MergeTree ORDER BY tuple()")
sess.query("INSERT INTO test_db.test_table (x, y) VALUES ('A', 'B'), ('C', 'D');")

# show final thread count
print("Final thread count:", len(psutil.Process().threads()))

print("Original values:")
ret = sess.query("SELECT * FROM test_db.test_table", "Debug")
print(ret)
# self.assertEqual(str(ret), '"A","B"\n"C","D"\n')

# show final thread count
print("Final thread count:", len(psutil.Process().threads()))

print('Values after ALTER UPDATE in same query expected:')
ret = sess.query(
"ALTER TABLE test_db.test_table UPDATE y = 'updated1' WHERE x = 'A';"
"SELECT * FROM test_db.test_table WHERE x = 'A';")
print(ret)
self.assertEqual(str(ret), '"A","updated1"\n')

# show final thread count
print("Final thread count:", len(psutil.Process().threads()))

# print("Values after UPDATE in same query (expected 'A', 'updated'):")
# ret = sess.query(
# "UPDATE test_db.test_table SET y = 'updated2' WHERE x = 'A';"
# "SELECT * FROM test_db.test_table WHERE x = 'A';")
# print(ret)
# self.assertEqual(str(ret), '"A","updated2"\n')

print('Values after UPDATE expected:')
sess.query("ALTER TABLE test_db.test_table UPDATE y = 'updated2' WHERE x = 'A';"
"ALTER TABLE test_db.test_table UPDATE y = 'updated3' WHERE x = 'A'")
ret = sess.query("SELECT * FROM test_db.test_table WHERE x = 'A'")
print(ret)
self.assertEqual(str(ret), '"A","updated3"\n')

# show final thread count
print("Final thread count:", len(psutil.Process().threads()))

print("Values after DELETE expected:")
sess.query("ALTER TABLE test_db.test_table DELETE WHERE x = 'A'")
ret = sess.query("SELECT * FROM test_db.test_table")
print(ret)
self.assertEqual(str(ret), '"C","D"\n')

# show final thread count
print("Final thread count:", len(psutil.Process().threads()))

print("Values after ALTER then OPTIMIZE expected:")
sess.query("ALTER TABLE test_db.test_table DELETE WHERE x = 'C'; OPTIMIZE TABLE test_db.test_table FINAL")
ret = sess.query("SELECT * FROM test_db.test_table")
print(ret)
self.assertEqual(str(ret), "")

print("Inserting 1000 rows")
sess.query("INSERT INTO test_db.test_table (x, y) SELECT toString(number), toString(number) FROM numbers(1000);")
ret = sess.query("SELECT count() FROM test_db.test_table", "Debug")
count = str(ret).count("\n")
print("Number of newline characters:", count)

# show final thread count
print("Final thread count:", len(psutil.Process().threads()))

time.sleep(3)
print("Final thread count after 3s:", len(psutil.Process().threads()))
self.assertEqual(len(psutil.Process().threads()), 1)


if __name__ == "__main__":
unittest.main()
Loading