Skip to content

Commit

Permalink
Enable reading multiple files in HDFS connection
Browse files Browse the repository at this point in the history
Enable reading multiple files in HDFS connection without using a list file
  • Loading branch information
chris9692 authored Feb 19, 2022
2 parents 8ab41ed + c55601c commit 73a07de
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.cdi.util.WorkUnitStatus;
import java.io.InputStream;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.State;
Expand All @@ -33,6 +34,7 @@
*/
public class HdfsConnection extends MultistageConnection {
private static final Logger LOG = LoggerFactory.getLogger(HdfsConnection.class);
private Iterator<String> fileListIterator = null;

public HdfsKeys getHdfsKeys() {
return hdfsKeys;
Expand Down Expand Up @@ -77,6 +79,19 @@ public HdfsConnection(State state, JobKeys jobKeys, ExtractorKeys extractorKeys)
@Override
public WorkUnitStatus execute(final WorkUnitStatus status) {
Preconditions.checkNotNull(hdfsKeys.getSourceUri(), "ms.source.uri is missing or of wrong format");

// If pagination has started, paginating through the files.
// If no more files to process, then return directly. That should stop the
// pagination process.
if (fileListIterator != null) {
if (fileListIterator.hasNext()) {
status.setBuffer(readSingleFile(fileListIterator.next()));
} else {
status.setBuffer(null);
}
return status;
}

URI uri = URI.create(getWorkUnitSpecificString(hdfsKeys.getSourceUri(),
getExtractorKeys().getDynamicParameters()));

Expand All @@ -89,8 +104,9 @@ public WorkUnitStatus execute(final WorkUnitStatus status) {
readFileList(uri.getPath(), uri.getQuery().substring(URI_REGEXP_PATTERN.length()))));
} else {
List<String> files = readFileList(uri.getPath(), ".*");
if (files.size() > 0) {
status.setBuffer(readSingleFile(files.get(0)));
fileListIterator = files.iterator();
if (fileListIterator.hasNext()) {
status.setBuffer(readSingleFile(fileListIterator.next()));
}
}
return status;
Expand Down Expand Up @@ -128,6 +144,29 @@ public WorkUnitStatus executeFirst(final WorkUnitStatus workUnitStatus) throws R
return execute(status);
}

/**
* execute the HDFS read command (getFileStream)
* @param workUnitStatus prior work unit status
* @return the updated work unit status
* @throws RetriableAuthenticationException if retry is needed
*/
@Override
public WorkUnitStatus executeNext(final WorkUnitStatus workUnitStatus) throws RetriableAuthenticationException {
WorkUnitStatus status = super.executeNext(workUnitStatus);

// If pagination has started already, but there is no more files to process,
// then return directly to stop the process
if (fileListIterator != null && !fileListIterator.hasNext()) {
workUnitStatus.setBuffer(null);
return workUnitStatus;
}

if (fsHelper == null) {
fsHelper = getHdfsClient();
}
return execute(status);
}

/**
* Read a list of files based on the given pattern
* @param path base path of files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private int getBaseRowCount(WorkUnitState workUnitState) {
} else {
rowCount.addAndGet(records.size());
}
} return rowCount.get();
}
return rowCount.get();
}

private void updateFailureCount(GenericRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public GenericRecord readRecord(GenericRecord reuse) {
row = avroSchemaBasedFilter.filter(row);
}
return addDerivedFields(row);
} else {
connection.closeStream();
if (hasNextPage() && processInputStream(avroExtractorKeys.getProcessedCount())) {
return readRecord(reuse);
}
}

if (!this.eof && extractorKeys.getExplictEof()) {
Expand Down

0 comments on commit 73a07de

Please sign in to comment.