Skip to content

Support Fields API in conditional ingest processors #131581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131581.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131581
summary: Support Fields API in conditional ingest processors
area: Infra/Core
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,150 @@ teardown:
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.conditional_field: "bar" }
- is_false: _source.bytes_target_field

---
"Test conditional processor with fields API":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body:
description: "_description"
processors:
- set:
if: "field('get.field').get('') == 'one'"
field: "one"
value: 1
- set:
if: "field('get.field').get('') == 'two'"
field: "missing"
value: "missing"
- set:
if: " /* avoid yaml stash */ $('get.field', 'one') == 'one'"
field: "dollar"
value: true
- set:
if: "field('missing.field').get('fallback') == 'fallback'"
field: "fallback"
value: "fallback"
- set:
if: "field('nested.array.get.with.index.field').get(1, null) == 'two'"
field: "two"
value: 2
- set:
if: "field('getName.field').getName() == 'getName.field'"
field: "three"
value: 3
- set:
if: "field('existing.field').exists()"
field: "four"
value: 4
- set:
if: "!field('empty.field').isEmpty()"
field: "missing"
value: "missing"
- set:
if: "field('size.field').size() == 2"
field: "five"
value: 5
- set:
if: >
def iterator = field('iterator.field').iterator();
def sum = 0;
while (iterator.hasNext()) {
sum += iterator.next();
}
return sum == 6;
field: "six"
value: 6
- set:
if: "field('hasValue.field').hasValue(v -> v == 'two')"
field: "seven"
value: 7
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body:
get.field: "one"
nested:
array:
get.with.index.field: ["one", "two", "three"]
getName.field: "my_name"
existing.field: "indeed"
empty.field: []
size.field: ["one", "two"]
iterator.field: [1, 2, 3]
hasValue.field: ["one", "two", "three"]

- do:
get:
index: test
id: "1"
- match: { _source.get\.field: "one" }
- match: { _source.one: 1 }
- is_false: _source.missing
- is_true: _source.dollar
- match: { _source.fallback: "fallback" }
- match: { _source.nested.array.get\.with\.index\.field: ["one", "two", "three"] }
- match: { _source.two: 2 }
- match: { _source.three: 3 }
- match: { _source.four: 4 }
- match: { _source.five: 5 }
- match: { _source.six: 6 }
- match: { _source.seven: 7 }

---
"Test fields iterator is unmodifiable":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body:
description: "_description"
processors:
- set:
if: >
def iterator = field('iterator.field').iterator();
def sum = 0;
while (iterator.hasNext()) {
sum += iterator.next();
iterator.remove();
}
return sum == 6;
field: "sum"
value: 6
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body:
test.field: [1, 2, 3]
- match: { error: null }

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body:
iterator.field: [1, 2, 3]
catch: bad_request
- length: { error.root_cause: 1 }

- do:
get:
index: test
id: "1"
- match: { _source.test\.field: [1, 2, 3] }
- is_false: _source.sum

- do:
get:
index: test
id: "2"
catch: missing
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ class org.elasticsearch.script.IngestScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".
#

# This file contains a whitelist for conditional ingest scripts

class org.elasticsearch.script.IngestConditionalScript {
SourceMapField field(String)
}

class org.elasticsearch.script.field.SourceMapField {
boolean exists()
Iterator iterator()
def get(def)
def get(int, def)
boolean hasValue(Predicate)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,12 @@ class org.elasticsearch.script.ReindexScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ class org.elasticsearch.script.UpdateScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ class org.elasticsearch.script.UpdateByQueryScript {
}

class org.elasticsearch.script.field.WriteField {
String getName()
boolean exists()
WriteField move(def)
WriteField overwrite(def)
void remove()
WriteField set(def)
WriteField append(def)
boolean isEmpty()
int size()
Iterator iterator()
def get(def)
def get(int, def)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
private final Processor processor;
private final IngestMetric metric;
private final LongSupplier relativeTimeProvider;
private final IngestConditionalScript precompiledConditionScript;
private final IngestConditionalScript.Factory precompiledConditionalScriptFactory;

ConditionalProcessor(String tag, String description, Script script, ScriptService scriptService, Processor processor) {
this(tag, description, script, scriptService, processor, System::nanoTime);
Expand All @@ -78,12 +78,11 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
this.relativeTimeProvider = relativeTimeProvider;

try {
final IngestConditionalScript.Factory factory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
if (ScriptType.INLINE.equals(script.getType())) {
precompiledConditionScript = factory.newInstance(script.getParams());
precompiledConditionalScriptFactory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
} else {
// stored script, so will have to compile at runtime
precompiledConditionScript = null;
precompiledConditionalScriptFactory = null;
}
} catch (ScriptException e) {
throw newConfigurationException(TYPE, tag, null, e);
Expand Down Expand Up @@ -141,12 +140,14 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
}

boolean evaluate(IngestDocument ingestDocument) {
IngestConditionalScript script = precompiledConditionScript;
if (script == null) {
IngestConditionalScript.Factory factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
script = factory.newInstance(condition.getParams());
}
return script.execute(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
IngestConditionalScript.Factory factory = precompiledConditionalScriptFactory;
if (factory == null) {
factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
}
return factory.newInstance(
condition.getParams(),
new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS))
).execute();
}

public Processor getInnerProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
/**
* A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}.
*/
public abstract class IngestConditionalScript {
public abstract class IngestConditionalScript extends SourceMapFieldScript {

public static final String[] PARAMETERS = { "ctx" };
public static final String[] PARAMETERS = {};

/** The context used to compile {@link IngestConditionalScript} factories. */
/**
* The context used to compile {@link IngestConditionalScript} factories.
* */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>(
"processor_conditional",
Factory.class,
Expand All @@ -30,21 +32,27 @@ public abstract class IngestConditionalScript {
true
);

/** The generic runtime parameters for the script. */
/**
* The generic runtime parameters for the script.
*/
private final Map<String, Object> params;

public IngestConditionalScript(Map<String, Object> params) {
public IngestConditionalScript(Map<String, Object> params, Map<String, Object> ctxMap) {
super(ctxMap);
this.params = params;
}

/** Return the parameters for this script. */
/**
* Return the parameters for this script.
* @return a map of parameters
*/
public Map<String, Object> getParams() {
return params;
}

public abstract boolean execute(Map<String, Object> ctx);
public abstract boolean execute();

public interface Factory {
IngestConditionalScript newInstance(Map<String, Object> params);
IngestConditionalScript newInstance(Map<String, Object> params, Map<String, Object> ctxMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.script;

import org.elasticsearch.script.field.SourceMapField;

import java.util.Map;

/**
* Abstract base class for scripts that read field values.
* These scripts provide {@code ctx} for backwards compatibility and expose {@link Metadata}.
*/
public abstract class SourceMapFieldScript {
protected final Map<String, Object> ctxMap;

public SourceMapFieldScript(Map<String, Object> ctxMap) {
this.ctxMap = ctxMap;
}

/**
* Provides backwards compatibility access to ctx
* @return the context map containing the source data
*/
public Map<String, Object> getCtx() {
return ctxMap;
}

/**
* Expose the {@link SourceMapField field} API
*
* @param path the path to the field in the source map
* @return a new {@link SourceMapField} instance for the specified path
*/
public SourceMapField field(String path) {
return new SourceMapField(path, () -> ctxMap);
}
}
Loading
Loading