Skip to content

Commit

Permalink
[GOBBLIN-1787] Ability to delete multiple watermarks in a state store (
Browse files Browse the repository at this point in the history
…#3645)

* [GOBBLIN-1787] Ability to delete watermarks in a state store

* Remove magic number used in non-create calls
  • Loading branch information
homatthew authored Feb 18, 2023
1 parent 1b556d2 commit 0a49cdb
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ public void createAlias(String storeName, String original, String alias)
public void delete(String storeName, String tableName)
throws IOException;

/**
* Delete a list of tables from a store.
*
* @param storeName store name
* @param tableNames List of table names in the state store to delete
* @throws IOException
*/
default void delete(String storeName, List<String> tableNames)
throws IOException {
for (String tableName : tableNames) {
delete(storeName, tableName);
}
}

/**
* Delete a store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -55,10 +56,10 @@ public class StateStoreBasedWatermarkStorage implements WatermarkStorage {
* A watermark prefix that is compatible with different watermark storage implementations.
* As such, this prefix should not include any characters disallowed in a {@link java.net.URI}.
*/
private static final String WATERMARK_STORAGE_PREFIX="streamingWatermarks_";
protected static final String WATERMARK_STORAGE_PREFIX="streamingWatermarks_";

public final StateStore<CheckpointableWatermarkState> _stateStore;
private final String _storeName;
protected final String _storeName;

/**
* A private method that creates a state store config
Expand Down Expand Up @@ -142,4 +143,8 @@ public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws
return _stateStore.getAll(_storeName);
}

public void deleteWatermarks(List<String> tableNames) throws IOException {
_stateStore.delete(_storeName, tableNames);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Expand Down Expand Up @@ -65,6 +66,14 @@
* @param <T> state object type
**/
public class ZkStateStore<T extends State> implements StateStore<T> {
/**
* Corresponds to {@link AccessOption}, which defines behavior for accessing znodes (get, remove, exists). The value 0 means not to
* throws exceptions if the znode does not exist (i.e. do not enable {@link AccessOption#THROW_EXCEPTION_IFNOTEXIST}
*
* Note: This variable is not be used for create calls like {@link HelixPropertyStore#create(String, Object, int)}
* which require specifying if the znode is {@link AccessOption#PERSISTENT}, {@link AccessOption#EPHEMERAL}, etc.
**/
private static final int DEFAULT_OPTION = 0;

// Class of the state objects to be put into the store
private final Class<T> stateClass;
Expand Down Expand Up @@ -101,15 +110,15 @@ private String formPath(String storeName, String tableName) {
public boolean create(String storeName) throws IOException {
String path = formPath(storeName);

return propStore.exists(path, 0) || propStore.create(path, ArrayUtils.EMPTY_BYTE_ARRAY,
return propStore.exists(path, DEFAULT_OPTION) || propStore.create(path, ArrayUtils.EMPTY_BYTE_ARRAY,
AccessOption.PERSISTENT);
}

@Override
public boolean create(String storeName, String tableName) throws IOException {
String path = formPath(storeName, tableName);

if (propStore.exists(path, 0)) {
if (propStore.exists(path, DEFAULT_OPTION)) {
throw new IOException(String.format("State already exists for storeName %s tableName %s", storeName,
tableName));
}
Expand All @@ -121,7 +130,7 @@ public boolean create(String storeName, String tableName) throws IOException {
public boolean exists(String storeName, String tableName) throws IOException {
String path = formPath(storeName, tableName);

return propStore.exists(path, 0);
return propStore.exists(path, DEFAULT_OPTION);
}

/**
Expand All @@ -145,7 +154,7 @@ private void addStateToDataOutputStream(DataOutput dataOutput, T state) throws I
private void putData(String storeName, String tableName, byte[] data) throws IOException {
String path = formPath(storeName, tableName);

if (!propStore.exists(path, 0)) {
if (!propStore.exists(path, DEFAULT_OPTION)) {
// create with data
if (!propStore.create(path, data, AccessOption.PERSISTENT)) {
throw new IOException("Failed to create a state file for table " + tableName);
Expand Down Expand Up @@ -179,15 +188,15 @@ public void putAll(String storeName, String tableName, Collection<T> states) thr
@Override
public T get(String storeName, String tableName, String stateId) throws IOException {
String path = formPath(storeName, tableName);
byte[] data = propStore.get(path, null, 0);
byte[] data = propStore.get(path, null, DEFAULT_OPTION);
List<T> states = Lists.newArrayList();

deserialize(data, states, stateId);

if (states.isEmpty()) {
return null;
} else {
return states.get(0);
return states.get(DEFAULT_OPTION);
}
}

Expand All @@ -203,15 +212,15 @@ protected List<T> getAll(String storeName, Predicate<String> predicate) throws I
String path = formPath(storeName);
byte[] data;

List<String> children = propStore.getChildNames(path, 0);
List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);

if (children == null) {
return Collections.emptyList();
}

for (String c : children) {
if (predicate.apply(c)) {
data = propStore.get(path + "/" + c, null, 0);
data = propStore.get(path + "/" + c, null, DEFAULT_OPTION);
deserialize(data, states);
}
}
Expand All @@ -223,7 +232,7 @@ protected List<T> getAll(String storeName, Predicate<String> predicate) throws I
public List<T> getAll(String storeName, String tableName) throws IOException {
List<T> states = Lists.newArrayList();
String path = formPath(storeName, tableName);
byte[] data = propStore.get(path, null, 0);
byte[] data = propStore.get(path, null, DEFAULT_OPTION);

deserialize(data, states);

Expand All @@ -240,7 +249,7 @@ public List<String> getTableNames(String storeName, Predicate<String> predicate)
List<String> names = Lists.newArrayList();
String path = formPath(storeName);

List<String> children = propStore.getChildNames(path, 0);
List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);

if (children != null) {
for (String c : children) {
Expand All @@ -265,7 +274,7 @@ public List<String> getStoreNames(Predicate<String> predicate)
List<String> names = Lists.newArrayList();
String path = formPath("");

List<String> children = propStore.getChildNames(path, 0);
List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);

if (children != null) {
for (String c : children) {
Expand All @@ -283,23 +292,29 @@ public void createAlias(String storeName, String original, String alias) throws
String pathOriginal = formPath(storeName, original);
byte[] data;

if (!propStore.exists(pathOriginal, 0)) {
if (!propStore.exists(pathOriginal, DEFAULT_OPTION)) {
throw new IOException(String.format("State does not exist for table %s", original));
}

data = propStore.get(pathOriginal, null, 0);
data = propStore.get(pathOriginal, null, DEFAULT_OPTION);

putData(storeName, alias, data);
}

@Override
public void delete(String storeName, String tableName) throws IOException {
propStore.remove(formPath(storeName, tableName), 0);
propStore.remove(formPath(storeName, tableName), DEFAULT_OPTION);
}

@Override
public void delete(String storeName, List<String> tableNames) throws IOException {
List<String> paths = tableNames.stream().map(table -> formPath(storeName, table)).collect(Collectors.toList());
propStore.remove(paths, DEFAULT_OPTION);
}

@Override
public void delete(String storeName) throws IOException {
propStore.remove(formPath(storeName), 0);
propStore.remove(formPath(storeName), DEFAULT_OPTION);
}

/**
Expand Down Expand Up @@ -348,4 +363,4 @@ private void deserialize(byte[] data, List<T> states, String stateId) throws IOE
private void deserialize(byte[] data, List<T> states) throws IOException {
deserialize(data, states, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,14 @@ public void testDeleteJobState() throws IOException {

@Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
public void testDeleteDatasetJobState() throws IOException {
JobState.DatasetState datasetState = zkDatasetStateStore.get(TEST_JOB_NAME,
TEST_DATASET_URN + "-" + zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, TEST_DATASET_URN);
String tableName = TEST_DATASET_URN + "-" + zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX;
JobState.DatasetState datasetState = zkDatasetStateStore.get(TEST_JOB_NAME, tableName, TEST_DATASET_URN);

Assert.assertNotNull(datasetState);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);

zkDatasetStateStore.delete(TEST_JOB_NAME);
zkDatasetStateStore.delete(TEST_JOB_NAME, Collections.singletonList(tableName));

datasetState = zkDatasetStateStore.get(TEST_JOB_NAME,
TEST_DATASET_URN + "-" + zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
Expand All @@ -288,4 +288,4 @@ public void tearDown() throws IOException {
testingServer.close();
}
}
}
}

0 comments on commit 0a49cdb

Please sign in to comment.