Skip to content

Commit

Permalink
TIKA-4252: add request metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicholas DiPiazza committed May 10, 2024
1 parent 32baf23 commit 5f4ce3d
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 42 deletions.
24 changes: 12 additions & 12 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,33 +455,33 @@ private Fetcher getFetcher(FetchEmitTuple t) {
}
}

protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
FetchKey fetchKey = t.getFetchKey();
protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple fetchEmitTuple, Fetcher fetcher) {
FetchKey fetchKey = fetchEmitTuple.getFetchKey();
Metadata fetchResponseMetadata = new Metadata();
Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata();
if (fetchKey.hasRange()) {
if (!(fetcher instanceof RangeFetcher)) {
throw new IllegalArgumentException(
"fetch key has a range, but the fetcher is not a range fetcher");
}
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(),
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
return parseWithStream(t, stream, metadata);
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), fetchRequestMetadata, fetchResponseMetadata)) {
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
LOG.error("security exception " + fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
} else {
Metadata metadata = t.getMetadata() == null ? new Metadata() : t.getMetadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
return parseWithStream(t, stream, metadata);
try (InputStream stream = fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata, fetchResponseMetadata)) {
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
LOG.error("security exception " + fetchEmitTuple.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + t.getId(), e);
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public String getName() {
}

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,11 @@ public interface Fetcher {

String getName();

InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException;
default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata)
throws TikaException, IOException {
return fetch(fetchKey, new Metadata(), fetchResponseMetadata);
}

InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
public interface RangeFetcher extends Fetcher {
//At some point, Tika 3.x?, we may want to add optional ranges to the fetchKey?

InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata metadata)
throws TikaException, IOException;
default InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchResponseMetadata)
throws TikaException, IOException {
return fetch(fetchKey, startOffset, endOffset, new Metadata(), fetchResponseMetadata);
}

InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static boolean isDescendant(Path root, Path descendant) {
}

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {

if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("Path must not contain \u0000. " +
Expand All @@ -76,8 +76,8 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
p = Paths.get(fetchKey);
}

metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
updateFileSystemMetadata(p, metadata);
fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
updateFileSystemMetadata(p, fetchRequestMetadata);

if (!Files.isRegularFile(p)) {
if (basePath != null && !Files.isDirectory(basePath)) {
Expand All @@ -87,7 +87,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
}
}

return TikaInputStream.get(p, metadata);
return TikaInputStream.get(p, fetchRequestMetadata);
}

private void updateFileSystemMetadata(Path p, Metadata metadata) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class UrlFetcher extends AbstractFetcher {

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("URL must not contain \u0000. " +
"Please review the life decisions that led you to requesting " +
Expand All @@ -46,7 +46,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
"The UrlFetcher does not fetch from file shares; " +
"please use the FileSystemFetcher");
}
return TikaInputStream.get(new URL(fetchKey), metadata);
return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public String getName() {
}

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return new ByteArrayInputStream(BYTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler)


@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return byteString == null ? new ByteArrayInputStream(new byte[0]) :
new ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {

LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", fetchKey, endpoint);

Expand All @@ -81,7 +81,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
BlobProperties properties = blobClient.getProperties();
if (properties.getMetadata() != null) {
for (Map.Entry<String, String> e : properties.getMetadata().entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
}
Expand All @@ -94,7 +94,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
try (OutputStream os = Files.newOutputStream(tmp)) {
blobClient.download(os);
}
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {

LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);

Expand All @@ -65,7 +65,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
if (extractUserMetadata) {
if (blob.getMetadata() != null) {
for (Map.Entry<String, String> e : blob.getMetadata().entrySet()) {
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
}
Expand All @@ -76,7 +76,7 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws TikaExceptio
TemporaryResources tmpResources = new TemporaryResources();
Path tmp = tmpResources.createTempFile();
blob.downloadTo(tmp);
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range


@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
HttpGet get = new HttpGet(fetchKey);
RequestConfig requestConfig =
RequestConfig.custom()
Expand All @@ -146,21 +146,21 @@ public InputStream fetch(String fetchKey, Metadata metadata) throws IOException,
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
return execute(get, metadata, httpClient, true);
return execute(get, fetchResponseMetadata, httpClient, true);
}

@Override
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws IOException {
HttpGet get = new HttpGet(fetchKey);
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
return execute(get, metadata, httpClient, true);
return execute(get, fetchResponseMetadata, httpClient, true);
}

private InputStream execute(HttpGet get, Metadata metadata, HttpClient client,
private InputStream execute(HttpGet get, Metadata fetchRequestMetadata, HttpClient client,
boolean retryOnBadLength) throws IOException {
HttpClientContext context = HttpClientContext.create();
HttpResponse response = null;
Expand All @@ -183,15 +183,15 @@ public void run() {
}
response = client.execute(get, context);

updateMetadata(get.getURI().toString(), response, context, metadata);
updateMetadata(get.getURI().toString(), response, context, fetchRequestMetadata);

int code = response.getStatusLine().getStatusCode();
if (code < 200 || code > 299) {
throw new IOException("bad status code: " + code + " :: " +
responseToString(response));
}
try (InputStream is = response.getEntity().getContent()) {
return spool(is, metadata);
return spool(is, fetchRequestMetadata);
}
} catch (ConnectionClosedException e) {

Expand All @@ -202,7 +202,7 @@ public void run() {
//and then compresses the stream. See HTTPCLIENT-2176
LOG.warn("premature end of content-length delimited message; retrying with " +
"content compression disabled for {}", get.getURI());
return execute(get, metadata, noCompressHttpClient, false);
return execute(get, fetchRequestMetadata, noCompressHttpClient, false);
}
throw e;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
private boolean pathStyleAccessEnabled = false;

@Override
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
return fetch(fetchKey, -1, -1, metadata);
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return fetch(fetchKey, -1, -1, fetchRequestMetadata);
}

@Override
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException {
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;

Expand All @@ -129,7 +129,7 @@ public InputStream fetch(String fetchKey, long startRange, long endRange, Metada
do {
try {
long start = System.currentTimeMillis();
InputStream is = _fetch(theFetchKey, metadata, startRange, endRange);
InputStream is = _fetch(theFetchKey, fetchResponseMetadata, startRange, endRange);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("total to fetch {}", elapsed);
return is;
Expand Down

0 comments on commit 5f4ce3d

Please sign in to comment.