Skip to content

Commit c33eb68

Browse files
reslove conflict
1 parent b4fd3d9 commit c33eb68

File tree

8 files changed

+276
-6
lines changed

8 files changed

+276
-6
lines changed

be/src/cloud/cloud_cumulative_compaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
375375
if (_input_rowsets.size() == 1) {
376376
DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version());
377377
// MUST NOT move input rowset to stale path
378-
cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock);
378+
cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true);
379379
} else {
380380
cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
381381
cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);

be/src/cloud/cloud_schema_change_job.cpp

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "olap/delete_handler.h"
3333
#include "olap/olap_define.h"
3434
#include "olap/rowset/beta_rowset.h"
35+
#include "olap/rowset/rowset.h"
3536
#include "olap/rowset/rowset_factory.h"
3637
#include "olap/rowset/segment_v2/inverted_index_desc.h"
3738
#include "olap/storage_engine.h"
@@ -217,6 +218,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
217218

218219
SchemaChangeParams sc_params;
219220

221+
// cache schema change output to file cache
222+
std::vector<RowsetSharedPtr> rowsets;
223+
rowsets.resize(rs_splits.size());
224+
std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(),
225+
[](RowSetSplits& split) { return split.rs_reader->rowset(); });
226+
sc_params.output_to_file_cache = CloudSchemaChangeJob::_should_cache_sc_output(rowsets);
227+
220228
RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
221229
sc_params.ref_rowset_readers.reserve(rs_splits.size());
222230
for (RowSetSplits& split : rs_splits) {
@@ -309,6 +317,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
309317
context.tablet_schema = _new_tablet->tablet_schema();
310318
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
311319
context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id);
320+
context.write_file_cache = sc_params.output_to_file_cache;
312321
if (!context.storage_resource) {
313322
return Status::InternalError("vault id not found, maybe not sync, vault id {}",
314323
sc_params.vault_id);
@@ -467,7 +476,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
467476
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
468477
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
469478
std::unique_lock wlock(_new_tablet->get_header_lock());
470-
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock);
479+
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
471480
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
472481
_new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
473482
stats.num_rows(), stats.data_size());
@@ -503,7 +512,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
503512
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
504513
{
505514
std::unique_lock wlock(tmp_tablet->get_header_lock());
506-
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
515+
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
507516
// Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version
508517
tmp_tablet->set_alter_version(alter_version);
509518
}
@@ -521,7 +530,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
521530
DBUG_BLOCK);
522531
{
523532
std::unique_lock wlock(tmp_tablet->get_header_lock());
524-
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
533+
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
525534
}
526535
for (auto rowset : ret.rowsets) {
527536
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
@@ -544,7 +553,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
544553
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
545554
{
546555
std::unique_lock wlock(tmp_tablet->get_header_lock());
547-
tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
556+
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
548557
}
549558
for (auto rowset : ret.rowsets) {
550559
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
@@ -595,4 +604,26 @@ void CloudSchemaChangeJob::clean_up_on_failure() {
595604
}
596605
}
597606

607+
bool CloudSchemaChangeJob::_should_cache_sc_output(
608+
const std::vector<RowsetSharedPtr>& input_rowsets) {
609+
int64_t total_size = 0;
610+
int64_t cached_index_size = 0;
611+
int64_t cached_data_size = 0;
612+
613+
for (const auto& rs : input_rowsets) {
614+
const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta();
615+
total_size += rs_meta->total_disk_size();
616+
cached_index_size += rs->approximate_cache_index_size();
617+
cached_data_size += rs->approximate_cached_data_size();
618+
}
619+
620+
double input_hit_rate = static_cast<double>(cached_index_size + cached_data_size) / total_size;
621+
622+
if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) {
623+
return true;
624+
}
625+
626+
return false;
627+
}
628+
598629
} // namespace doris

be/src/cloud/cloud_schema_change_job.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class CloudSchemaChangeJob {
3939
void clean_up_on_failure();
4040

4141
private:
42+
bool static _should_cache_sc_output(const std::vector<RowsetSharedPtr>& input_rowsets);
43+
4244
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
4345
cloud::TabletJobInfoPB& job);
4446

be/src/cloud/cloud_tablet.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
384384

385385
auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
386386
for (auto& rs : rowsets) {
387-
if (version_overlap || warmup_delta_data) {
387+
if (warmup_delta_data) {
388388
#ifndef BE_TEST
389389
bool warm_up_state_updated = false;
390390
// Warmup rowset data in background

be/src/common/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,7 @@ DEFINE_mBool(enable_file_cache_adaptive_write, "true");
11541154
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
11551155
// if difference below this threshold, we consider cache's progressive upgrading (2.0->3.0) successful
11561156
DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
1157+
DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");
11571158

11581159
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
11591160
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");

be/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,6 +1176,7 @@ DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
11761176
DECLARE_mBool(enable_file_cache_adaptive_write);
11771177
DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
11781178
DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold);
1179+
DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio);
11791180
DECLARE_mInt64(file_cache_remove_block_qps_limit);
11801181
DECLARE_mInt64(file_cache_background_gc_interval_ms);
11811182
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);

be/src/olap/schema_change.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ struct SchemaChangeParams {
280280
ObjectPool pool;
281281
int32_t be_exec_version;
282282
std::string vault_id;
283+
bool output_to_file_cache;
283284
};
284285

285286
class SchemaChangeJob {
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import java.util.concurrent.atomic.AtomicBoolean
19+
import org.apache.doris.regression.suite.ClusterOptions
20+
import org.codehaus.groovy.runtime.IOGroovyMethods
21+
import org.apache.doris.regression.util.Http
22+
import org.apache.doris.regression.util.OutputUtils
23+
24+
@groovy.transform.Immutable
25+
class RowsetInfo {
26+
int startVersion
27+
int endVersion
28+
String id
29+
String originalString
30+
}
31+
32+
suite("test_filecache_with_alter_table", "docker") {
33+
def options = new ClusterOptions()
34+
options.cloudMode = true
35+
options.setFeNum(1)
36+
options.setBeNum(1)
37+
38+
options.beConfigs.add('enable_flush_file_cache_async=false')
39+
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
40+
options.beConfigs.add('enable_evict_file_cache_in_advance=false')
41+
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
42+
43+
def baseTestTable = "test_filecache_with_alter_table"
44+
def backendId_to_backendIP = [:]
45+
def backendId_to_backendHttpPort = [:]
46+
def backendId_to_backendBrpcPort = [:]
47+
def csvPathPrefix = "/tmp/temp_csv_data"
48+
def loadBatchNum = 20
49+
50+
def generateCsvData = {
51+
def rowsPerFile = 32768
52+
def columnsPerRow = 4
53+
def headers = 'col1,col2,col3,col4'
54+
55+
def dir = new File(csvPathPrefix)
56+
if (!dir.exists()) {
57+
dir.mkdirs()
58+
} else {
59+
dir.eachFile { it.delete() }
60+
}
61+
62+
long currentNumber = 1L
63+
(1..loadBatchNum).each { fileIndex ->
64+
def fileName = String.format("${csvPathPrefix}/data_%02d.csv", fileIndex)
65+
def csvFile = new File(fileName)
66+
67+
csvFile.withWriter('UTF-8') { writer ->
68+
writer.writeLine(headers)
69+
(1..rowsPerFile).each { rowIndex ->
70+
def row = (1..columnsPerRow).collect { currentNumber++ }
71+
writer.writeLine(row.join(','))
72+
}
73+
}
74+
}
75+
logger.info("Successfully generated ${loadBatchNum} CSV files in ${csvPathPrefix}")
76+
}
77+
78+
def getTabletStatus = { tablet ->
79+
String tabletId = tablet.TabletId
80+
String backendId = tablet.BackendId
81+
def beHost = backendId_to_backendIP[backendId]
82+
def beHttpPort = backendId_to_backendHttpPort[backendId]
83+
84+
String command = "curl -s -X GET http://${beHost}:${beHttpPort}/api/compaction/show?tablet_id=${tabletId}"
85+
86+
logger.info("Executing: ${command}")
87+
def process = command.execute()
88+
def exitCode = process.waitFor()
89+
def output = process.getText()
90+
91+
logger.info("Get tablet status response: code=${exitCode}, out=${output}")
92+
assertEquals(0, exitCode, "Failed to get tablet status.")
93+
94+
return parseJson(output.trim())
95+
}
96+
97+
def waitForAlterJobToFinish = { tableName, timeoutMillis ->
98+
def pollInterval = 1000
99+
def timeElapsed = 0
100+
while (timeElapsed <= timeoutMillis) {
101+
def alterResult = sql_return_maparray """SHOW ALTER TABLE COLUMN WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;"""
102+
logger.info("Checking ALTER status for table '${tableName}': ${alterResult}")
103+
if (alterResult && alterResult[0].State == "FINISHED") {
104+
sleep(3000)
105+
logger.info("ALTER job on table '${tableName}' finished. Details: ${alterResult[0]}")
106+
return
107+
}
108+
sleep(pollInterval)
109+
timeElapsed += pollInterval
110+
}
111+
fail("Wait for ALTER job on table '${tableName}' to finish timed out after ${timeoutMillis}ms.")
112+
}
113+
114+
def runSchemaChangeCacheTest = { String testTable, double inputCacheRatio, boolean expectOutputCached ->
115+
logger.info("==================================================================================")
116+
logger.info("Running Test Case on Table '${testTable}': Input Cache Ratio = ${inputCacheRatio * 100}%, Expect Output Cached = ${expectOutputCached}")
117+
logger.info("==================================================================================")
118+
119+
sql """ DROP TABLE IF EXISTS ${testTable} force;"""
120+
sql """
121+
CREATE TABLE IF NOT EXISTS ${testTable} (
122+
col1 bigint,
123+
col2 bigint,
124+
col3 bigint,
125+
col4 bigint
126+
)
127+
UNIQUE KEY(col1)
128+
DISTRIBUTED BY HASH(col1) BUCKETS 1
129+
PROPERTIES (
130+
"replication_num" = "1",
131+
"disable_auto_compaction" = "true"
132+
)
133+
"""
134+
135+
(1..loadBatchNum).each { fileIndex ->
136+
def fileName = String.format("${csvPathPrefix}/data_%02d.csv", fileIndex)
137+
streamLoad {
138+
logger.info("Stream loading file index ${fileIndex} into table ${testTable}")
139+
set "column_separator", ","
140+
table testTable
141+
file fileName
142+
time 3000
143+
check { res, exception, startTime, endTime ->
144+
if (exception != null) throw exception
145+
def json = parseJson(res)
146+
assertEquals("success", json.Status.toLowerCase())
147+
}
148+
}
149+
}
150+
sql """ SELECT COUNT(col1) from ${testTable} """
151+
152+
def tablets = sql_return_maparray "show tablets from ${testTable};"
153+
assertEquals(1, tablets.size(), "Expected to find exactly one tablet.")
154+
def tablet = tablets[0]
155+
def beHost = backendId_to_backendIP[tablet.BackendId]
156+
def beHttpPort = backendId_to_backendHttpPort[tablet.BackendId]
157+
158+
def tabletStatus = getTabletStatus(tablet)
159+
List<RowsetInfo> originalRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr ->
160+
def parts = rowsetStr.split(" ")
161+
def versionParts = parts[0].replace('[', '').replace(']', '').split("-")
162+
new RowsetInfo(
163+
startVersion: versionParts[0].toInteger(),
164+
endVersion: versionParts[1].toInteger(),
165+
id: parts[4],
166+
originalString: rowsetStr
167+
)
168+
}.findAll { it.startVersion != 0 }.sort { it.startVersion }
169+
170+
int numToClear = Math.round(originalRowsetInfos.size() * (1 - inputCacheRatio)).toInteger()
171+
logger.info("Total data rowsets: ${originalRowsetInfos.size()}. Clearing cache for ${numToClear} rowsets to achieve ~${inputCacheRatio * 100}% hit ratio.")
172+
173+
originalRowsetInfos.take(numToClear).each { rowset ->
174+
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=clear&sync=true&value=${rowset.id}_0.dat", true)
175+
}
176+
177+
def cachedInputRowsets = originalRowsetInfos.findAll { rowset ->
178+
def data = Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat", true)
179+
data.any { item -> !item.endsWith("_idx") && !item.endsWith("_disposable") }
180+
}
181+
182+
def actualCachedRatio = cachedInputRowsets.size() / (double)originalRowsetInfos.size()
183+
logger.info("Verification: Cached input rowsets: ${cachedInputRowsets.size()}. Actual cache ratio: ${actualCachedRatio * 100}%")
184+
assertTrue(Math.abs(inputCacheRatio - actualCachedRatio) < 0.01, "Actual cache ratio does not match expected ratio.")
185+
186+
logger.info("Triggering ALTER TABLE on ${testTable}")
187+
sql """ALTER TABLE ${testTable} MODIFY COLUMN col2 VARCHAR(255)"""
188+
waitForAlterJobToFinish(testTable, 60000)
189+
190+
tablets = sql_return_maparray "show tablets from ${testTable};"
191+
tablet = tablets[0]
192+
tabletStatus = getTabletStatus(tablet)
193+
194+
def newRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr ->
195+
def parts = rowsetStr.split(" ")
196+
def version_pair = parts[0].replace('[', '').replace(']', '').split('-')
197+
new RowsetInfo(
198+
startVersion: version_pair[0].toInteger(),
199+
endVersion: version_pair[1].toInteger(),
200+
id: parts[4],
201+
originalString: rowsetStr
202+
)
203+
}.findAll { it.startVersion != 0 }.sort { it.startVersion }
204+
205+
def cachedOutputRowsets = newRowsetInfos.findAll { rowset ->
206+
def data = Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat", true)
207+
data.any { item -> !item.endsWith("_idx") && !item.endsWith("_disposable") }
208+
}
209+
210+
logger.info("After ALTER, found ${cachedOutputRowsets.size()} cached output rowsets out of ${newRowsetInfos.size()}.")
211+
212+
if (expectOutputCached) {
213+
assertTrue(cachedOutputRowsets.size() > 0, "Expected output rowsets to be cached, but none were found.")
214+
} else {
215+
assertEquals(0, cachedOutputRowsets.size(), "Expected output rowsets NOT to be cached, but some were found.")
216+
}
217+
logger.info("Test Case Passed: Input Ratio ${inputCacheRatio * 100}%, Output Cached Check: ${expectOutputCached}")
218+
219+
sql """ DROP TABLE IF EXISTS ${testTable} force;"""
220+
}
221+
222+
docker(options) {
223+
getBackendIpHttpAndBrpcPort(backendId_to_backendIP, backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
224+
225+
sql """ set global enable_auto_analyze = false;"""
226+
227+
generateCsvData()
228+
229+
runSchemaChangeCacheTest("${baseTestTable}_0", 0.0, false)
230+
runSchemaChangeCacheTest("${baseTestTable}_65", 0.65, false)
231+
runSchemaChangeCacheTest("${baseTestTable}_75", 0.75, true)
232+
runSchemaChangeCacheTest("${baseTestTable}_100", 1.0, true)
233+
}
234+
}

0 commit comments

Comments
 (0)