@@ -33,18 +33,18 @@ use snafu::{OptionExt, ResultExt};
33
33
use store_api:: metadata:: RegionMetadata ;
34
34
use store_api:: storage:: { ColumnId , RegionId } ;
35
35
36
- use super :: INDEX_BLOB_TYPE ;
37
36
use crate :: cache:: file_cache:: { FileCacheRef , FileType , IndexKey } ;
38
37
use crate :: cache:: index:: bloom_filter_index:: {
39
38
BloomFilterIndexCacheRef , CachedBloomFilterIndexBlobReader ,
40
39
} ;
41
40
use crate :: error:: {
42
- ApplyBloomFilterIndexSnafu , ColumnNotFoundSnafu , ConvertValueSnafu , MetadataSnafu ,
41
+ ApplyBloomFilterIndexSnafu , ColumnNotFoundSnafu , ConvertValueSnafu , Error , MetadataSnafu ,
43
42
PuffinBuildReaderSnafu , PuffinReadBlobSnafu , Result ,
44
43
} ;
45
44
use crate :: metrics:: INDEX_APPLY_ELAPSED ;
46
45
use crate :: row_converter:: SortField ;
47
46
use crate :: sst:: file:: FileId ;
47
+ use crate :: sst:: index:: bloom_filter:: INDEX_BLOB_TYPE ;
48
48
use crate :: sst:: index:: codec:: IndexValueCodec ;
49
49
use crate :: sst:: index:: puffin_manager:: { BlobReader , PuffinManagerFactory } ;
50
50
use crate :: sst:: index:: TYPE_BLOOM_FILTER_INDEX ;
@@ -118,28 +118,21 @@ impl BloomFilterIndexApplier {
118
118
. start_timer ( ) ;
119
119
120
120
for ( column_id, predicates) in & self . filters {
121
- let mut blob = match self . cached_blob_reader ( file_id, * column_id) . await {
122
- Ok ( Some ( puffin_reader) ) => puffin_reader,
123
- other => {
124
- if let Err ( err) = other {
125
- warn ! ( err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file." )
126
- }
127
- self . remote_blob_reader ( file_id, * column_id, file_size_hint)
128
- . await ?
129
- }
121
+ let mut blob = match self
122
+ . blob_reader ( file_id, * column_id, file_size_hint)
123
+ . await ?
124
+ {
125
+ Some ( blob) => blob,
126
+ None => continue ,
130
127
} ;
131
128
132
129
// Create appropriate reader based on whether we have caching enabled
133
130
if let Some ( bloom_filter_cache) = & self . bloom_filter_index_cache {
134
- let file_size = if let Some ( file_size) = file_size_hint {
135
- file_size
136
- } else {
137
- blob. metadata ( ) . await . context ( MetadataSnafu ) ?. content_length
138
- } ;
131
+ let blob_size = blob. metadata ( ) . await . context ( MetadataSnafu ) ?. content_length ;
139
132
let reader = CachedBloomFilterIndexBlobReader :: new (
140
133
file_id,
141
134
* column_id,
142
- file_size ,
135
+ blob_size ,
143
136
BloomFilterReaderImpl :: new ( blob) ,
144
137
bloom_filter_cache. clone ( ) ,
145
138
) ;
@@ -157,6 +150,43 @@ impl BloomFilterIndexApplier {
157
150
Ok ( ( ) )
158
151
}
159
152
153
+ /// Creates a blob reader from the cached or remote index file.
154
+ ///
155
+ /// Returus `None` if the column does not have an index.
156
+ async fn blob_reader (
157
+ & self ,
158
+ file_id : FileId ,
159
+ column_id : ColumnId ,
160
+ file_size_hint : Option < u64 > ,
161
+ ) -> Result < Option < BlobReader > > {
162
+ let reader = match self . cached_blob_reader ( file_id, column_id) . await {
163
+ Ok ( Some ( puffin_reader) ) => puffin_reader,
164
+ other => {
165
+ if let Err ( err) = other {
166
+ // Blob not found means no index for this column
167
+ if is_blob_not_found ( & err) {
168
+ return Ok ( None ) ;
169
+ }
170
+ warn ! ( err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file." )
171
+ }
172
+ let res = self
173
+ . remote_blob_reader ( file_id, column_id, file_size_hint)
174
+ . await ;
175
+ if let Err ( err) = res {
176
+ // Blob not found means no index for this column
177
+ if is_blob_not_found ( & err) {
178
+ return Ok ( None ) ;
179
+ }
180
+ return Err ( err) ;
181
+ }
182
+
183
+ res?
184
+ }
185
+ } ;
186
+
187
+ Ok ( Some ( reader) )
188
+ }
189
+
160
190
/// Creates a blob reader from the cached index file
161
191
async fn cached_blob_reader (
162
192
& self ,
@@ -242,6 +272,16 @@ impl BloomFilterIndexApplier {
242
272
}
243
273
}
244
274
275
+ fn is_blob_not_found ( err : & Error ) -> bool {
276
+ matches ! (
277
+ err,
278
+ Error :: PuffinBuildReader {
279
+ source: puffin:: error:: Error :: BlobNotFound { .. } ,
280
+ ..
281
+ }
282
+ )
283
+ }
284
+
245
285
pub struct BloomFilterIndexApplierBuilder < ' a > {
246
286
region_dir : String ,
247
287
object_store : ObjectStore ,
0 commit comments