Skip to content

Commit

Permalink
feature: hbase compatible 2.x (#242)
Browse files Browse the repository at this point in the history
* support hbase scan renewLease (#211)

* hbase tablename bug fix (#237)

* hbase empty family scan error (#239)

* hbase tablename bug fix

* hbase empty family scan error

---------

Co-authored-by: stuBirdFly <[email protected]>
  • Loading branch information
miyuan-ljr and stuBirdFly authored Nov 25, 2024
1 parent 14677e5 commit 3171e0d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
3 changes: 0 additions & 3 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3140,16 +3140,13 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
} else if (request instanceof ObTableQueryRequest) {
// TableGroup -> TableName
String tableName = request.getTableName();
tableName = getPhyTableNameFromTableGroup(((ObTableQueryRequest) request), tableName);
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryRequest) request).getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
return new ObClusterTableQuery(tableQuery).executeInternal();
} else if (request instanceof ObTableQueryAsyncRequest) {
// TableGroup -> TableName
String tableName = request.getTableName();
tableName = getPhyTableNameFromTableGroup(
((ObTableQueryAsyncRequest) request).getObTableQueryRequest(), tableName);
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
return result;
}

/*
* RenewLease.
*/
public void renewLease() throws Exception {
throw new IllegalStateException("renew only support stream query");
}

/*
* Next.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Map;

public enum ObQueryOperationType {
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2);
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3);

private int value;
private static Map<Integer, ObQueryOperationType> map = new HashMap<Integer, ObQueryOperationType>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,32 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
return buildPartitions(client, tableQuery, tableName);
}

// This function is designed for HBase-type requests.
// It is used to extend the session duration of a scan
@Override
public void renewLease() throws Exception {
if (!isEnd() && !expectant.isEmpty()) {
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
.iterator();
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
ObPair<Long, ObTableParam> partIdWithObTable = lastEntry.getValue();
// try access new partition, async will not remove useless expectant
ObTableParam obTableParam = partIdWithObTable.getRight();
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();

// refresh request info
queryRequest.setPartitionId(obTableParam.getPartitionId());
queryRequest.setTableId(obTableParam.getTableId());

// refresh async query request
asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW);
asyncRequest.setQuerySessionId(sessionId);
executeAsync(partIdWithObTable, asyncRequest);
} else {
throw new ObTableException("query end or expectant is null");
}
}

@Override
public boolean next() throws Exception {
checkStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
Expand Down Expand Up @@ -192,14 +193,6 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback<Abstract
}
}

// set correct table group name for hbase
if (tableQuery.isHbaseQuery()
&& obTableClient.getTableGroupInverted().containsKey(tableName)
&& tableName.equalsIgnoreCase(obTableClient.getTableGroupCache().get(
obTableClient.getTableGroupInverted().get(tableName)))) {
tableName = obTableClient.getTableGroupInverted().get(tableName);
}

// init query stream result
AbstractQueryStreamResult streamResult = callable.execute();

Expand Down Expand Up @@ -266,7 +259,9 @@ public Map<Long, ObPair<Long, ObTableParam>> initPartitions(ObTableQuery tableQu
end[i] = endKey.getObj(i).isMinObj() || endKey.getObj(i).isMaxObj() ?
endKey.getObj(i) : endKey.getObj(i).getValue();
}

if (this.entityType == ObTableEntityType.HKV && obTableClient.isTableGroupName(tableName)) {
indexTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, false);
}
ObBorderFlag borderFlag = range.getBorderFlag();
List<ObPair<Long, ObTableParam>> pairs = this.obTableClient.getTables(indexTableName,
tableQuery, start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
Expand Down

0 comments on commit 3171e0d

Please sign in to comment.