Skip to content

Commit 6ca3ac2

Browse files
moschepgomulka
andauthored
Track raw ingest and storage size separately to support updates by doc (elastic#111179)
This PR starts tracking raw ingest and storage size separately for updates by document. This is done capturing the ingest size when initially parsing the update, and storage size when parsing the final, merged document. Additionally this renames DocumentSizeObserver to XContentParserDecorator / XContentMeteringParserDecorator for better reasoning about the code. More renaming will have to follow. --------- Co-authored-by: Przemyslaw Gomulka <[email protected]>
1 parent 65c57b3 commit 6ca3ac2

File tree

27 files changed

+180
-181
lines changed

27 files changed

+180
-181
lines changed
Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.bytes.BytesArray;
1515
import org.elasticsearch.common.bytes.BytesReference;
1616
import org.elasticsearch.index.mapper.MapperService;
17+
import org.elasticsearch.index.mapper.ParsedDocument;
1718
import org.elasticsearch.ingest.common.IngestCommonPlugin;
1819
import org.elasticsearch.plugins.IngestPlugin;
1920
import org.elasticsearch.plugins.Plugin;
@@ -32,7 +33,7 @@
3233
import static org.hamcrest.Matchers.equalTo;
3334

3435
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
35-
public class DocumentSizeObserverWithPipelinesIT extends ESIntegTestCase {
36+
public class XContentMeteringParserDecoratorWithPipelinesIT extends ESIntegTestCase {
3637

3738
private static String TEST_INDEX_NAME = "test-index-name";
3839
// the assertions are done in plugin which is static and will be created by ES server.
@@ -90,13 +91,13 @@ public DocumentParsingProvider getDocumentParsingProvider() {
9091
// returns a static instance, because we want to assert that the wrapping is called only once
9192
return new DocumentParsingProvider() {
9293
@Override
93-
public <T> DocumentSizeObserver newDocumentSizeObserver(DocWriteRequest<T> request) {
94+
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest<T> request) {
9495
if (request instanceof IndexRequest indexRequest && indexRequest.getNormalisedBytesParsed() > 0) {
9596
long normalisedBytesParsed = indexRequest.getNormalisedBytesParsed();
9697
providedFixedSize.set(normalisedBytesParsed);
97-
return new TestDocumentSizeObserver(normalisedBytesParsed);
98+
return new TestXContentMeteringParserDecorator(normalisedBytesParsed);
9899
}
99-
return new TestDocumentSizeObserver(0L);
100+
return new TestXContentMeteringParserDecorator(0L);
100101
}
101102

102103
@Override
@@ -111,17 +112,15 @@ public DocumentSizeReporter newDocumentSizeReporter(
111112
}
112113
}
113114

114-
public static class TestDocumentSizeObserver implements DocumentSizeObserver {
115+
public static class TestXContentMeteringParserDecorator implements XContentMeteringParserDecorator {
115116
long mapCounter = 0;
116-
long wrapperCounter = 0;
117117

118-
public TestDocumentSizeObserver(long mapCounter) {
118+
public TestXContentMeteringParserDecorator(long mapCounter) {
119119
this.mapCounter = mapCounter;
120120
}
121121

122122
@Override
123-
public XContentParser wrapParser(XContentParser xContentParser) {
124-
wrapperCounter++;
123+
public XContentParser decorate(XContentParser xContentParser) {
125124
hasWrappedParser = true;
126125
return new FilterXContentParserWrapper(xContentParser) {
127126

@@ -134,10 +133,9 @@ public Map<String, Object> map() throws IOException {
134133
}
135134

136135
@Override
137-
public long normalisedBytesParsed() {
138-
return mapCounter;
136+
public ParsedDocument.DocumentSize meteredDocumentSize() {
137+
return new ParsedDocument.DocumentSize(mapCounter, 0);
139138
}
140-
141139
}
142140

143141
}

server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverIT.java renamed to server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import static org.hamcrest.Matchers.equalTo;
3535

3636
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
37-
public class DocumentSizeObserverIT extends ESIntegTestCase {
37+
public class XContentMeteringParserDecoratorIT extends ESIntegTestCase {
3838

3939
private static String TEST_INDEX_NAME = "test-index-name";
4040

@@ -125,8 +125,8 @@ public TestDocumentParsingProviderPlugin() {}
125125
public DocumentParsingProvider getDocumentParsingProvider() {
126126
return new DocumentParsingProvider() {
127127
@Override
128-
public <T> DocumentSizeObserver newDocumentSizeObserver(DocWriteRequest<T> request) {
129-
return new TestDocumentSizeObserver(0L);
128+
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest<T> request) {
129+
return new TestXContentMeteringParserDecorator(0L);
130130
}
131131

132132
@Override
@@ -151,20 +151,23 @@ public TestDocumentSizeReporter(String indexName) {
151151

152152
@Override
153153
public void onIndexingCompleted(ParsedDocument parsedDocument) {
154-
COUNTER.addAndGet(parsedDocument.getDocumentSizeObserver().normalisedBytesParsed());
154+
long delta = parsedDocument.getNormalizedSize().ingestedBytes();
155+
if (delta > 0) {
156+
COUNTER.addAndGet(delta);
157+
}
155158
assertThat(indexName, equalTo(TEST_INDEX_NAME));
156159
}
157160
}
158161

159-
public static class TestDocumentSizeObserver implements DocumentSizeObserver {
162+
public static class TestXContentMeteringParserDecorator implements XContentMeteringParserDecorator {
160163
long counter = 0;
161164

162-
public TestDocumentSizeObserver(long counter) {
165+
public TestXContentMeteringParserDecorator(long counter) {
163166
this.counter = counter;
164167
}
165168

166169
@Override
167-
public XContentParser wrapParser(XContentParser xContentParser) {
170+
public XContentParser decorate(XContentParser xContentParser) {
168171
hasWrappedParser = true;
169172
return new FilterXContentParserWrapper(xContentParser) {
170173

@@ -177,9 +180,8 @@ public Token nextToken() throws IOException {
177180
}
178181

179182
@Override
180-
public long normalisedBytesParsed() {
181-
return counter;
183+
public ParsedDocument.DocumentSize meteredDocumentSize() {
184+
return new ParsedDocument.DocumentSize(counter, counter);
182185
}
183-
184186
}
185187
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ static TransportVersion def(int id) {
181181
public static final TransportVersion SEGMENT_LEVEL_FIELDS_STATS = def(8_711_00_0);
182182
public static final TransportVersion ML_ADD_DETECTION_RULE_PARAMS = def(8_712_00_0);
183183
public static final TransportVersion FIX_VECTOR_SIMILARITY_INNER_HITS = def(8_713_00_0);
184+
public static final TransportVersion INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN = def(8_714_00_0);
184185

185186
/*
186187
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.index.engine.Engine;
1919
import org.elasticsearch.index.shard.IndexShard;
2020
import org.elasticsearch.index.translog.Translog;
21-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
2221

2322
import java.util.Arrays;
2423
import java.util.List;
@@ -63,7 +62,6 @@ enum ItemProcessingState {
6362
private BulkItemResponse executionResult;
6463
private int updateRetryCounter;
6564
private long noopMappingUpdateRetryForMappingVersion;
66-
private DocumentSizeObserver documentSizeObserver = DocumentSizeObserver.EMPTY_INSTANCE;
6765

6866
BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
6967
this.request = request;
@@ -369,12 +367,4 @@ private boolean assertInvariants(ItemProcessingState... expectedCurrentState) {
369367
}
370368
return true;
371369
}
372-
373-
public void setDocumentSizeObserver(DocumentSizeObserver documentSizeObserver) {
374-
this.documentSizeObserver = documentSizeObserver;
375-
}
376-
377-
public DocumentSizeObserver getDocumentSizeObserver() {
378-
return documentSizeObserver;
379-
}
380370
}

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import org.elasticsearch.indices.SystemIndices;
6060
import org.elasticsearch.node.NodeClosedException;
6161
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
62-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
62+
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
6363
import org.elasticsearch.threadpool.ThreadPool;
6464
import org.elasticsearch.transport.TransportRequestOptions;
6565
import org.elasticsearch.transport.TransportService;
@@ -364,16 +364,14 @@ static boolean executeBulkItemRequest(
364364
} else {
365365
final IndexRequest request = context.getRequestToExecute();
366366

367-
DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver(request);
368-
369-
context.setDocumentSizeObserver(documentSizeObserver);
367+
XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(request);
370368
final SourceToParse sourceToParse = new SourceToParse(
371369
request.id(),
372370
request.source(),
373371
request.getContentType(),
374372
request.routing(),
375373
request.getDynamicTemplates(),
376-
documentSizeObserver
374+
meteringParserDecorator
377375
);
378376
result = primary.applyIndexOperationOnPrimary(
379377
version,

server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.elasticsearch.indices.SystemIndices;
3232
import org.elasticsearch.ingest.IngestService;
3333
import org.elasticsearch.ingest.SimulateIngestService;
34-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
34+
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
3535
import org.elasticsearch.tasks.Task;
3636
import org.elasticsearch.threadpool.ThreadPool;
3737
import org.elasticsearch.transport.TransportService;
@@ -122,7 +122,7 @@ private Exception validateMappings(IndexRequest request) {
122122
request.getContentType(),
123123
request.routing(),
124124
request.getDynamicTemplates(),
125-
DocumentSizeObserver.EMPTY_INSTANCE
125+
XContentMeteringParserDecorator.NOOP
126126
);
127127

128128
ClusterState state = clusterService.state();

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.elasticsearch.index.mapper.MapperService;
3939
import org.elasticsearch.index.shard.ShardId;
4040
import org.elasticsearch.ingest.IngestService;
41-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
41+
import org.elasticsearch.plugins.internal.XContentParserDecorator;
4242
import org.elasticsearch.xcontent.XContentBuilder;
4343
import org.elasticsearch.xcontent.XContentFactory;
4444
import org.elasticsearch.xcontent.XContentType;
@@ -147,6 +147,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
147147
private Object rawTimestamp;
148148
private long normalisedBytesParsed = -1;
149149
private boolean originatesFromUpdateByScript;
150+
private boolean originatesFromUpdateByDoc;
150151

151152
public IndexRequest(StreamInput in) throws IOException {
152153
this(null, in);
@@ -204,6 +205,12 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
204205
} else {
205206
originatesFromUpdateByScript = false;
206207
}
208+
209+
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
210+
originatesFromUpdateByDoc = in.readBoolean();
211+
} else {
212+
originatesFromUpdateByDoc = false;
213+
}
207214
}
208215

209216
public IndexRequest() {
@@ -407,8 +414,8 @@ public Map<String, Object> sourceAsMap() {
407414
return XContentHelper.convertToMap(source, false, contentType).v2();
408415
}
409416

410-
public Map<String, Object> sourceAsMap(DocumentSizeObserver documentSizeObserver) {
411-
return XContentHelper.convertToMap(source, false, contentType, documentSizeObserver).v2();
417+
public Map<String, Object> sourceAsMap(XContentParserDecorator parserDecorator) {
418+
return XContentHelper.convertToMap(source, false, contentType, parserDecorator).v2();
412419
}
413420

414421
/**
@@ -768,6 +775,10 @@ private void writeBody(StreamOutput out) throws IOException {
768775
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
769776
out.writeBoolean(originatesFromUpdateByScript);
770777
}
778+
779+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
780+
out.writeBoolean(originatesFromUpdateByDoc);
781+
}
771782
}
772783

773784
@Override
@@ -931,15 +942,6 @@ public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) {
931942
return this;
932943
}
933944

934-
/**
935-
* when observing document size while parsing, this method indicates that this request should not be recorded.
936-
* @return an index request
937-
*/
938-
public IndexRequest noParsedBytesToReport() {
939-
this.normalisedBytesParsed = 0;
940-
return this;
941-
}
942-
943945
/**
944946
* Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
945947
*
@@ -977,6 +979,15 @@ public IndexRequest setOriginatesFromUpdateByScript(boolean originatesFromUpdate
977979
}
978980

979981
public boolean originatesFromUpdateByScript() {
980-
return this.originatesFromUpdateByScript;
982+
return originatesFromUpdateByScript;
983+
}
984+
985+
public boolean originatesFromUpdateByDoc() {
986+
return originatesFromUpdateByDoc;
987+
}
988+
989+
public IndexRequest setOriginatesFromUpdateByDoc(boolean originatesFromUpdateByDoc) {
990+
this.originatesFromUpdateByDoc = originatesFromUpdateByDoc;
991+
return this;
981992
}
982993
}

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.elasticsearch.index.shard.IndexShard;
2828
import org.elasticsearch.index.shard.ShardId;
2929
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
30-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
30+
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
3131
import org.elasticsearch.script.Script;
3232
import org.elasticsearch.script.ScriptService;
3333
import org.elasticsearch.script.UpdateCtxMap;
@@ -181,14 +181,14 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat
181181
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
182182
final IndexRequest currentRequest = request.doc();
183183
final String routing = calculateRouting(getResult, currentRequest);
184-
final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver(request);
184+
final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(request);
185185
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
186186
final XContentType updateSourceContentType = sourceAndContent.v1();
187187
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();
188188

189189
final boolean noop = XContentHelper.update(
190190
updatedSourceAsMap,
191-
currentRequest.sourceAsMap(documentSizeObserver),
191+
currentRequest.sourceAsMap(meteringParserDecorator),
192192
detectNoop
193193
) == false;
194194

@@ -226,8 +226,8 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
226226
.waitForActiveShards(request.waitForActiveShards())
227227
.timeout(request.timeout())
228228
.setRefreshPolicy(request.getRefreshPolicy())
229-
.setNormalisedBytesParsed(documentSizeObserver.normalisedBytesParsed());
230-
229+
.setOriginatesFromUpdateByDoc(true);
230+
finalIndexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes());
231231
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
232232
}
233233
}

server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.elasticsearch.core.CheckedFunction;
2121
import org.elasticsearch.core.Nullable;
2222
import org.elasticsearch.core.Tuple;
23-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
23+
import org.elasticsearch.plugins.internal.XContentParserDecorator;
2424
import org.elasticsearch.xcontent.DeprecationHandler;
2525
import org.elasticsearch.xcontent.NamedXContentRegistry;
2626
import org.elasticsearch.xcontent.ToXContent;
@@ -152,14 +152,14 @@ public static Tuple<XContentType, Map<String, Object>> convertToMap(
152152
BytesReference bytes,
153153
boolean ordered,
154154
XContentType xContentType,
155-
DocumentSizeObserver documentSizeObserver
155+
XContentParserDecorator parserDecorator
156156
) {
157157
return parseToType(
158158
ordered ? XContentParser::mapOrdered : XContentParser::map,
159159
bytes,
160160
xContentType,
161161
XContentParserConfiguration.EMPTY,
162-
documentSizeObserver
162+
parserDecorator
163163
);
164164
}
165165

@@ -207,19 +207,19 @@ public static <T> Tuple<XContentType, T> parseToType(
207207
@Nullable XContentType xContentType,
208208
@Nullable XContentParserConfiguration config
209209
) throws ElasticsearchParseException {
210-
return parseToType(extractor, bytes, xContentType, config, DocumentSizeObserver.EMPTY_INSTANCE);
210+
return parseToType(extractor, bytes, xContentType, config, XContentParserDecorator.NOOP);
211211
}
212212

213213
public static <T> Tuple<XContentType, T> parseToType(
214214
CheckedFunction<XContentParser, T, IOException> extractor,
215215
BytesReference bytes,
216216
@Nullable XContentType xContentType,
217217
@Nullable XContentParserConfiguration config,
218-
DocumentSizeObserver documentSizeObserver
218+
XContentParserDecorator parserDecorator
219219
) throws ElasticsearchParseException {
220220
config = config != null ? config : XContentParserConfiguration.EMPTY;
221221
try (
222-
XContentParser parser = documentSizeObserver.wrapParser(
222+
XContentParser parser = parserDecorator.decorate(
223223
xContentType != null ? createParser(config, bytes, xContentType) : createParser(config, bytes)
224224
)
225225
) {

0 commit comments

Comments
 (0)