Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert bad commits #1755

Merged
merged 2 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,33 +455,33 @@ private Fetcher getFetcher(FetchEmitTuple t) {
}
}

protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple fetchEmitTuple, Fetcher fetcher) {
FetchKey fetchKey = fetchEmitTuple.getFetchKey();
Metadata fetchResponseMetadata = new Metadata();
Metadata fetchRequestMetadata = fetchEmitTuple.getMetadata();
protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
FetchKey fetchKey = t.getFetchKey();
if (fetchKey.hasRange()) {
if (!(fetcher instanceof RangeFetcher)) {
throw new IllegalArgumentException(
"fetch key has a range, but the fetcher is not a range fetcher");
}
Metadata metadata = new Metadata();
try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(),
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), fetchRequestMetadata, fetchResponseMetadata)) {
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + fetchEmitTuple.getId(), e);
LOG.error("security exception " + t.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
LOG.warn("fetch exception " + t.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
} else {
try (InputStream stream = fetcher.fetch(fetchEmitTuple.getFetchKey().getFetchKey(), fetchRequestMetadata, fetchResponseMetadata)) {
return parseWithStream(fetchEmitTuple, stream, fetchResponseMetadata);
Metadata metadata = new Metadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + fetchEmitTuple.getId(), e);
LOG.error("security exception " + t.getId(), e);
throw e;
} catch (TikaException | IOException e) {
LOG.warn("fetch exception " + fetchEmitTuple.getId(), e);
LOG.warn("fetch exception " + t.getId(), e);
write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public String getName() {
}

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,5 @@ public interface Fetcher {

String getName();

default InputStream fetch(String fetchKey, Metadata fetchResponseMetadata)
throws TikaException, IOException {
return fetch(fetchKey, new Metadata(), fetchResponseMetadata);
}

InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
throws TikaException, IOException;
InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@
public interface RangeFetcher extends Fetcher {
//At some point, Tika 3.x?, we may want to add optional ranges to the fetchKey?

default InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchResponseMetadata)
throws TikaException, IOException {
return fetch(fetchKey, startOffset, endOffset, new Metadata(), fetchResponseMetadata);
}

InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
InputStream fetch(String fetchKey, long startOffset, long endOffset, Metadata metadata)
throws TikaException, IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static boolean isDescendant(Path root, Path descendant) {
}

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {

if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("Path must not contain \u0000. " +
Expand All @@ -76,8 +76,8 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
p = Paths.get(fetchKey);
}

fetchRequestMetadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
updateFileSystemMetadata(p, fetchRequestMetadata);
metadata.set(TikaCoreProperties.SOURCE_PATH, fetchKey);
updateFileSystemMetadata(p, metadata);

if (!Files.isRegularFile(p)) {
if (basePath != null && !Files.isDirectory(basePath)) {
Expand All @@ -87,7 +87,7 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
}
}

return TikaInputStream.get(p, fetchRequestMetadata);
return TikaInputStream.get(p, metadata);
}

private void updateFileSystemMetadata(Path p, Metadata metadata) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class UrlFetcher extends AbstractFetcher {

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("URL must not contain \u0000. " +
"Please review the life decisions that led you to requesting " +
Expand All @@ -46,7 +46,7 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
"The UrlFetcher does not fetch from file shares; " +
"please use the FileSystemFetcher");
}
return TikaInputStream.get(new URL(fetchKey), fetchRequestMetadata);
return TikaInputStream.get(new URL(fetchKey), metadata);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public String getName() {
}

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
return new ByteArrayInputStream(BYTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void checkInitialization(InitializableProblemHandler problemHandler)


@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
return byteString == null ? new ByteArrayInputStream(new byte[0]) :
new ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class AZBlobFetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {

LOGGER.debug("about to fetch fetchkey={} from endpoint ({})", fetchKey, endpoint);

Expand All @@ -81,7 +81,7 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
BlobProperties properties = blobClient.getProperties();
if (properties.getMetadata() != null) {
for (Map.Entry<String, String> e : properties.getMetadata().entrySet()) {
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
}
Expand All @@ -94,7 +94,7 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
try (OutputStream os = Files.newOutputStream(tmp)) {
blobClient.download(os);
}
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class GCSFetcher extends AbstractFetcher implements Initializable {
private boolean spoolToTemp = true;

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {

LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);

Expand All @@ -65,7 +65,7 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
if (extractUserMetadata) {
if (blob.getMetadata() != null) {
for (Map.Entry<String, String> e : blob.getMetadata().entrySet()) {
fetchRequestMetadata.add(PREFIX + ":" + e.getKey(), e.getValue());
metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
}
}
}
Expand All @@ -76,7 +76,7 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
TemporaryResources tmpResources = new TemporaryResources();
Path tmp = tmpResources.createTempFile();
blob.downloadTo(tmp);
TikaInputStream tis = TikaInputStream.get(tmp, fetchRequestMetadata, tmpResources);
TikaInputStream tis = TikaInputStream.get(tmp, metadata, tmpResources);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
return tis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable, Range


@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws IOException, TikaException {
public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
HttpGet get = new HttpGet(fetchKey);
RequestConfig requestConfig =
RequestConfig.custom()
Expand All @@ -146,21 +146,21 @@ public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadat
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
return execute(get, fetchResponseMetadata, httpClient, true);
return execute(get, metadata, httpClient, true);
}

@Override
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
throws IOException {
HttpGet get = new HttpGet(fetchKey);
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
return execute(get, fetchResponseMetadata, httpClient, true);
return execute(get, metadata, httpClient, true);
}

private InputStream execute(HttpGet get, Metadata fetchRequestMetadata, HttpClient client,
private InputStream execute(HttpGet get, Metadata metadata, HttpClient client,
boolean retryOnBadLength) throws IOException {
HttpClientContext context = HttpClientContext.create();
HttpResponse response = null;
Expand All @@ -183,15 +183,15 @@ public void run() {
}
response = client.execute(get, context);

updateMetadata(get.getURI().toString(), response, context, fetchRequestMetadata);
updateMetadata(get.getURI().toString(), response, context, metadata);

int code = response.getStatusLine().getStatusCode();
if (code < 200 || code > 299) {
throw new IOException("bad status code: " + code + " :: " +
responseToString(response));
}
try (InputStream is = response.getEntity().getContent()) {
return spool(is, fetchRequestMetadata);
return spool(is, metadata);
}
} catch (ConnectionClosedException e) {

Expand All @@ -202,7 +202,7 @@ public void run() {
//and then compresses the stream. See HTTPCLIENT-2176
LOG.warn("premature end of content-length delimited message; retrying with " +
"content compression disabled for {}", get.getURI());
return execute(get, fetchRequestMetadata, noCompressHttpClient, false);
return execute(get, metadata, noCompressHttpClient, false);
}
throw e;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFe
private boolean pathStyleAccessEnabled = false;

@Override
public InputStream fetch(String fetchKey, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata) throws TikaException, IOException {
return fetch(fetchKey, -1, -1, fetchRequestMetadata);
public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
return fetch(fetchKey, -1, -1, metadata);
}

@Override
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata fetchRequestMetadata, Metadata fetchResponseMetadata)
public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
throws TikaException, IOException {
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix + fetchKey;

Expand All @@ -129,7 +129,7 @@ public InputStream fetch(String fetchKey, long startRange, long endRange, Metada
do {
try {
long start = System.currentTimeMillis();
InputStream is = _fetch(theFetchKey, fetchResponseMetadata, startRange, endRange);
InputStream is = _fetch(theFetchKey, metadata, startRange, endRange);
long elapsed = System.currentTimeMillis() - start;
LOGGER.debug("total to fetch {}", elapsed);
return is;
Expand Down
Loading