Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIKA-4252: add request metadata #1753

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,33 +455,33 @@ private Fetcher getFetcher(FetchEmitTuple t) {
}
}

protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
FetchKey fetchKey = t.getFetchKey();
protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple fetchEmitTuple, Fetcher fetcher) {
FetchKey fetchKey = fetchEmitTuple.getFetchKey();
Metadata fetchResponseMetadata = new Metadata();
Copy link
Contributor

@tballison tballison May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata that goes in the fetchemittuple was envisioned to be user-injected metadata that was injected after the parse and then emitted (e.g. provenance metadata).

I think we need to put both metadatas on the fetchemittuple.

This is what I'm thinking...let me know what you think.

So, there will be three metadatas in play. The fetchemit tuple will have a fetchRequestMetadata (???) and a userMetadata (???). At parse time, we'll create a fresh metadata object, which we'll call "responseMetadata" in the following call: fetcher.fetch(requestMetadata, responseMetadata).

The parse will then use the responseMetadata and, after the parse, inject the userMetadata from the fetchEmitTuple.

The fetcher may use the fetchRequestMetadata to carry out its request, but info from that one should not make it into the "responseMetadata" nor make it into the emit data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nddipiazza any chance you can revert this in main so that we have a working build? Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shoot i didn't realize i was deplying broken builds! reverted. i'll make this change and make a new pr

Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata();
if (fetchKey.hasRange()) {
if (!(fetcher instanceof RangeFetcher)) {
throw new IllegalArgumentException(
"fetch key has a range, but the fetcher is not a range fetcher");
}
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(),
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
return parseWithStream(t, stream, metadata);
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), fetchRequestMetadata, fetchResponseMetadata)) {
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
LOG.error("security exception " + fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
} else {
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
return parseWithStream(t, stream, metadata);
try (InputStream stream = fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata, fetchResponseMetadata)) {
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
LOG.error("security exception " + fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public String getName() {
}

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,11 @@ public interface Fetcher {

String getName();

InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException;
default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata)
throws TikaException, IOException {
return fetch(fetchKey, new Metadata(), fetchResponseMetadata);
}

InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
public interface RangeFetcher extends Fetcher {
//At some point, Tika 3.x?, we may want to add optional ranges to the fetchKey?

InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata metadata)
throws TikaException, IOException;
default InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchResponseMetadata)
throws TikaException, IOException {
return fetch(fetchKey, startOffset, endOffset, new Metadata(), fetchResponseMetadata);
}

InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static boolean isDescendant(Path root, Path descendant) {
}

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {

if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("Path must not contain \u0000. " +
Expand All @@ -76,8 +76,8 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
p = Paths.get(fetchKey);
}

metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
updateFileSystemMetadata(p, metadata);
fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
updateFileSystemMetadata(p, fetchRequestMetadata);

if (!Files.isRegularFile(p)) {
if (basePath != null && !Files.isDirectory(basePath)) {
Expand All @@ -87,7 +87,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
}
}

return TikaInputStream.get(p, metadata);
return TikaInputStream.get(p, fetchRequestMetadata);
}

private void updateFileSystemMetadata(Path p, Metadata metadata) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class UrlFetcher extends AbstractFetcher {

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("URL must not contain \u0000. " +
"Please review the life decisions that led you to requesting " +
Expand All @@ -46,7 +46,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
"The UrlFetcher does not fetch from file shares; " +
"please use the FileSystemFetcher");
}
return TikaInputStream.get(new URL(fetchKey), metadata);
return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public String getName() {
}

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return new ByteArrayInputStream(BYTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler)


@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return byteString == null ? new ByteArrayInputStream(new byte[0]) :
new ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {

LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", fetchKey, endpoint);

Expand All @@ -81,7 +81,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
BlobProperties properties = blobClient.getProperties();
if (properties.getMetadata() != null) {
for (Map.Entry<String, String> e : properties.getMetadata().entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
}
Expand All @@ -94,7 +94,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
try (OutputStream os = Files.newOutputStream(tmp)) {
blobClient.download(os);
}
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {

LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);

Expand All @@ -65,7 +65,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
if (extractUserMetadata) {
if (blob.getMetadata() != null) {
for (Map.Entry<String, String> e : blob.getMetadata().entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
}
Expand All @@ -76,7 +76,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
TemporaryResources tmpResources = new TemporaryResources();
Path tmp = tmpResources.createTempFile();
blob.downloadTo(tmp);
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range


@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
HttpGet get = new HttpGet(fetchKey);
RequestConfig requestConfig =
RequestConfig.custom()
Expand All @@ -146,21 +146,21 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
return execute(get, metadata, httpClient, true);
return execute(get, fetchResponseMetadata, httpClient, true);
}

@Override
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws IOException {
HttpGet get = new HttpGet(fetchKey);
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
return execute(get, metadata, httpClient, true);
return execute(get, fetchResponseMetadata, httpClient, true);
}

private InputStream execute(HttpGet get, Metadata metadata, HttpClient client,
private InputStream execute(HttpGet get, Metadata fetchRequestMetadata, HttpClient client,
boolean retryOnBadLength) throws IOException {
HttpClientContext context = HttpClientContext.create();
HttpResponse response = null;
Expand All @@ -183,15 +183,15 @@ public void run() {
}
response = client.execute(get, context);

updateMetadata(get.getURI().toString(), response, context, metadata);
updateMetadata(get.getURI().toString(), response, context, fetchRequestMetadata);

int code = response.getStatusLine().getStatusCode();
if (code < 200 || code > 299) {
throw new IOException("bad status code: " + code + " :: " +
responseToString(response));
}
try (InputStream is = response.getEntity().getContent()) {
return spool(is, metadata);
return spool(is, fetchRequestMetadata);
}
} catch (ConnectionClosedException e) {

Expand All @@ -202,7 +202,7 @@ public void run() {
//and then compresses the stream. See HTTPCLIENT-2176
LOG.warn("premature end of content-length delimited message; retrying with " +
"content compression disabled for {}", get.getURI());
return execute(get, metadata, noCompressHttpClient, false);
return execute(get, fetchRequestMetadata, noCompressHttpClient, false);
}
throw e;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
private boolean pathStyleAccessEnabled = false;

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
return fetch(fetchKey, -1, -1, metadata);
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return fetch(fetchKey, -1, -1, fetchRequestMetadata);
}

@Override
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException {
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;

Expand All @@ -129,7 +129,7 @@ public InputStream fetch(String fetchKey, long startRange, long endRange, Metada
do {
try {
long start = System.currentTimeMillis();
InputStream is = _fetch(theFetchKey, metadata, startRange, endRange);
InputStream is = _fetch(theFetchKey, fetchResponseMetadata, startRange, endRange);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("total to fetch {}", elapsed);
return is;
Expand Down
Loading