-
Notifications
You must be signed in to change notification settings - Fork 750
Exactly Once Support
This page outlines the design for exactly-once support in Gobblin.
Currently the flow of publishing data in Gobblin is:
- DataWriter writes to staging folder
- DataWriter moves files from staging folder to task output folder
- Publisher moves files from task output folder to job output folder
- Persists checkpoints (watermarks) to state store
- Delete staging folder and task-output folder.
This flow does not theoretically guarantee exactly-once delivery, rather, it guarantess at least once. Because if something bad happens in step 4, or between steps 3 and 4, it is possible that data is published but checkpoints are not, and the next run will re-extract and re-publish those records.
To guarantee exactly-once, steps 3 & 4 should be atomic.
The idea is similar as write-head logging. Before doing the atomic steps (i.e., steps 3 & 4), first write all these steps (referred to as CommitStep
s) into a CommitStepStore
. In this way, if failure happens during the atomic steps, the next run can continue doing the rest of the steps before ingesting more data for this dataset.
Example: Suppose we have a Kafka-HDFS ingestion job, where each Kafka topic is a dataset. Suppose a task generates three output files for topic 'MyTopic':
task-output/MyTopic/2015-12-09/1.avro
task-output/MyTopic/2015-12-09/2.avro
task-output/MyTopic/2015-12-10/1.avro
which should be published to
job-output/MyTopic/2015-12-09/1.avro
job-output/MyTopic/2015-12-09/2.avro
job-output/MyTopic/2015-12-10/1.avro
And suppose this topic has two partitions, and the their checkpoints, i.e., the actual high watermarks are offset=100
and offset=200
.
In this case, there will be 5 CommitSteps for this dataset:
-
FsRenameCommitStep
: renametask-output/MyTopic/2015-12-09/1.avro
tojob-output/MyTopic/2015-12-09/1.avro
-
FsRenameCommitStep
: renametask-output/MyTopic/2015-12-09/2.avro
tojob-output/MyTopic/2015-12-09/2.avro
-
FsRenameCommitStep
: renametask-output/MyTopic/2015-12-10/1.avro
tojob-output/MyTopic/2015-12-10/1.avro
-
HighWatermarkCommitStep
: set the high watermark for partitionMyTopic:0 = 100
-
HighWatermarkCommtiStep
: set the high watermark for partitionMyTopic:1 = 200
If all these CommitStep
s are successful, we can proceed with deleting task-output folder and deleting the above CommitStep
s from the CommitStepStore
. If any of these steps fails, these steps will not be deleted. When the next run starts, for each dataset, it will check whether there are CommitStep
s for this dataset in the CommitStepStore. If there are, it means the previous run may not have successfully executed some of these steps, so it will verify whether each step has been done, and re-do the step if not. If the re-do fails for a certain number of times, this dataset will be skipped. Thus the CommitStep
interface will have two methods: verify()
and execute()
.
The above approach potentially affects scalability for two reasons:
- The driver needs to write all
CommitStep
s to theCommitStepStore
for each dataset, once it determines that all tasks for the dataset have finished. This may cause scalability issues if there are too manyCommitStep
s, too many datasets, or too many tasks. - Upon the start of the next run, the driver needs to verify all
CommitStep
s and redo theCommitStep
s that the previous run failed to do. This may also cause scalability issues if there are too manyCommitStep
s.
Both issues can be resolved by moving the majority of the work to an MR job, rather than doing it in the driver.
For #1, in addition to the MR job pulling the data, we can launch a separate MR job, in which each container is responsible for writing CommitStep
s for a subset of the datasets. Each container will keep polling the TaskStateStore
to determine whether all tasks for each dataset that it is responsible for have finished, and if so, it writes CommitStep
s for this dataset to the CommitStepStore
.
#2 can also easily be parallelized where we have each container responsible for a subset of datasets.
CommitStep:
/**
* A step during committing in a Gobblin job that should be atomically executed with other steps.
*/
public abstract class CommitStep implements Writable {
protected CommitStep() {
}
/**
* Verify whether the CommitStep has been done.
*/
public abstract boolean verify() throws IOException;
/**
* Execute a CommitStep.
*/
public abstract boolean execute() throws IOException;
public static CommitStep get(DataInput in, Class<? extends CommitStep> clazz) throws IOException {
try {
CommitStep step = clazz.newInstance();
step.readFields(in);
return step;
} catch (InstantiationException | IllegalAccessException e) {
throw Throwables.propagate(e);
}
}
}
CommitStepStore:
/**
* A store for {@link CommitStep}s.
*/
public interface CommitStepStore {
/**
* Create a store with the given name.
*/
public boolean create(String storeName) throws IOException;
/**
* Create a new dataset URN in a store.
*/
public boolean create(String storeName, String datasetUrn) throws IOException;
/**
* Whether a dataset URN exists in a store.
*/
public boolean exists(String storeName, String datasetUrn) throws IOException;
/**
* Remove a given store.
*/
public boolean remove(String storeName) throws IOException;
/**
* Remove all {@link CommitStep}s for the given dataset URN from the store.
*/
public boolean remove(String storeName, String datasetUrn) throws IOException;
/**
* Put a {@link CommitStep} with the given dataset URN into the store.
*/
public boolean put(String storeName, String datasetUrn, CommitStep step) throws IOException;
/**
* Get all {@link CommitSteps} associated with the given dataset URN in the store.
*/
public Collection<byte[]> get(String storeName, String datasetUrn) throws IOException;
}
- Home
- [Getting Started](Getting Started)
- Architecture
- User Guide
- Working with Job Configuration Files
- [Deployment](Gobblin Deployment)
- Gobblin on Yarn
- Compaction
- [State Management and Watermarks] (State-Management-and-Watermarks)
- Working with the ForkOperator
- [Configuration Glossary](Configuration Properties Glossary)
- [Partitioned Writers](Partitioned Writers)
- Monitoring
- Schedulers
- [Job Execution History Store](Job Execution History Store)
- Gobblin Build Options
- Troubleshooting
- [FAQs] (FAQs)
- Case Studies
- Gobblin Metrics
- [Quick Start](Gobblin Metrics)
- [Existing Reporters](Existing Reporters)
- [Metrics for Gobblin ETL](Metrics for Gobblin ETL)
- [Gobblin Metrics Architecture](Gobblin Metrics Architecture)
- [Implementing New Reporters](Implementing New Reporters)
- [Gobblin Metrics Performance](Gobblin Metrics Performance)
- Developer Guide
- [Customization: New Source](Customization for New Source)
- [Customization: Converter/Operator](Customization for Converter and Operator)
- Code Style Guide
- IDE setup
- Monitoring Design
- Project
- [Feature List](Feature List)
- Contributors/Team
- [Talks/Tech Blogs](Talks and Tech Blogs)
- News/Roadmap
- Posts
- Miscellaneous