Skip to content

Commit

Permalink
HIVE-28399: Improve the fetch size in HiveConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhhu653 committed Jul 25, 2024
1 parent 78f577d commit e8ed23e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 39 deletions.
30 changes: 13 additions & 17 deletions jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public class HiveConnection implements java.sql.Connection {
private final List<TProtocolVersion> supportedProtocols = new LinkedList<TProtocolVersion>();
private int loginTimeout = 0;
private TProtocolVersion protocol;
private final int initFetchSize;
private int defaultFetchSize;
int fetchSize;
private String initFile = null;
private String wmPool = null, wmApp = null;
private Properties clientInfo;
Expand Down Expand Up @@ -278,7 +277,7 @@ private static String makeDirectJDBCUrlFromConnectionParams(JdbcConnectionParams
public HiveConnection() {
sessConfMap = null;
isEmbeddedMode = true;
initFetchSize = 0;
fetchSize = 50;
}

public HiveConnection(String uri, Properties info) throws SQLException {
Expand Down Expand Up @@ -320,8 +319,6 @@ protected HiveConnection(String uri, Properties info,
port = connParams.getPort();
isEmbeddedMode = connParams.isEmbeddedMode();

initFetchSize = Integer.parseInt(sessConfMap.getOrDefault(JdbcConnectionParams.FETCH_SIZE, "0"));

if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) {
initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
}
Expand Down Expand Up @@ -1250,17 +1247,17 @@ private void openSession(TOpenSessionReq openReq) throws TException, SQLExceptio
protocol = openResp.getServerProtocolVersion();
sessHandle = openResp.getSessionHandle();

final String serverFetchSizeString =
openResp.getConfiguration().get(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname);
if (serverFetchSizeString == null) {
throw new IllegalStateException("Server returned a null default fetch size. Check that "
+ ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname + " is configured correctly.");
}

this.defaultFetchSize = Integer.parseInt(serverFetchSizeString);
if (this.defaultFetchSize <= 0) {
ConfVars confVars = ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE;
int serverFetchSize = Optional.ofNullable(openResp.getConfiguration().get(confVars.varname))
.map(size -> Integer.parseInt(size))
.orElse(confVars.defaultIntVal);
if (serverFetchSize <= 0) {
throw new IllegalStateException("Default fetch size must be greater than 0");
}
this.fetchSize = Optional.ofNullable(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE))
.map(size -> Integer.parseInt(size))
.filter(v -> v > 0)
.orElse(serverFetchSize);
}

/**
Expand Down Expand Up @@ -1577,7 +1574,7 @@ public Statement createStatement() throws SQLException {
if (isClosed) {
throw new SQLException("Can't create Statement, connection is closed");
}
return new HiveStatement(this, client, sessHandle, false, initFetchSize, defaultFetchSize);
return new HiveStatement(this, client, sessHandle, false, fetchSize);
}

/*
Expand All @@ -1600,8 +1597,7 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency)
if (isClosed) {
throw new SQLException("Connection is closed");
}
return new HiveStatement(this, client, sessHandle, resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE,
initFetchSize, defaultFetchSize);
return new HiveStatement(this, client, sessHandle, resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE, fetchSize);
}

/*
Expand Down
16 changes: 5 additions & 11 deletions jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.RowSetFactory;
Expand Down Expand Up @@ -70,16 +69,13 @@ public class HiveStatement implements java.sql.Statement {
private static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class);

public static final String QUERY_CANCELLED_MESSAGE = "Query was cancelled.";
private static final int DEFAULT_FETCH_SIZE =
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal;

private final HiveConnection connection;
private TCLIService.Iface client;
private Optional<TOperationHandle> stmtHandle;
private final TSessionHandle sessHandle;
Map<String, String> sessConf = new HashMap<>();
private int fetchSize;
private final int defaultFetchSize;
int fetchSize;
private final boolean isScrollableResultset;
private boolean isOperationComplete = false;
private boolean closeOnResultSetCompletion = false;
Expand Down Expand Up @@ -130,22 +126,20 @@ public class HiveStatement implements java.sql.Statement {

public HiveStatement(HiveConnection connection, TCLIService.Iface client,
TSessionHandle sessHandle) {
this(connection, client, sessHandle, false, 0, DEFAULT_FETCH_SIZE);
this(connection, client, sessHandle, false, connection.fetchSize);
}

public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle,
boolean isScrollableResultset, int initFetchSize, int defaultFetchSize) {
boolean isScrollableResultset, int fetchSize) {
this.connection = Objects.requireNonNull(connection);
this.client = Objects.requireNonNull(client);
this.sessHandle = Objects.requireNonNull(sessHandle);

if (initFetchSize < 0 || defaultFetchSize <= 0) {
if ((this.fetchSize = fetchSize) <= 0) {
throw new IllegalArgumentException();
}

this.isScrollableResultset = isScrollableResultset;
this.defaultFetchSize = defaultFetchSize;
this.fetchSize = (initFetchSize == 0) ? defaultFetchSize : initFetchSize;
this.inPlaceUpdateStream = Optional.empty();
this.stmtHandle = Optional.empty();
}
Expand Down Expand Up @@ -676,7 +670,7 @@ public void setFetchSize(int rows) throws SQLException {
if (rows > 0) {
this.fetchSize = rows;
} else if (rows == 0) {
this.fetchSize = this.defaultFetchSize;
this.fetchSize = connection.fetchSize;
} else {
throw new SQLException("Fetch size must be greater or equal to 0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.withSettings;

import java.sql.SQLException;

Expand All @@ -47,7 +49,6 @@

public class TestHivePreparedStatement {

@Mock
private HiveConnection connection;
@Mock
private Iface client;
Expand All @@ -65,6 +66,7 @@ public class TestHivePreparedStatement {

@Before
public void before() throws Exception {
connection = mock(HiveConnection.class, withSettings().useConstructor());
MockitoAnnotations.initMocks(this);
when(tExecStatementResp.getStatus()).thenReturn(tStatusSuccess);
when(tExecStatementResp.getOperationHandle()).thenReturn(tOperationHandle);
Expand Down
19 changes: 9 additions & 10 deletions jdbc/src/test/org/apache/hive/jdbc/TestHiveStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.withSettings;

import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
Expand All @@ -39,7 +40,7 @@ public class TestHiveStatement {
*/
@Test
public void testSetFetchSize() throws SQLException {
final HiveConnection connection = mock(HiveConnection.class);
final HiveConnection connection = mock(HiveConnection.class, withSettings().useConstructor());
final Iface iface = mock(Iface.class);
final TSessionHandle handle = mock(TSessionHandle.class);

Expand All @@ -63,10 +64,11 @@ public void testSetFetchSizeZero() throws SQLException {

// No hint specified and no default value passed in through the constructor,
// so it falls-back to the configuration default value
int fetchSize = HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal;
connection.fetchSize = fetchSize;
try (HiveStatement stmt = new HiveStatement(connection, iface, handle)) {
stmt.setFetchSize(0);
assertEquals(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal,
stmt.getFetchSize());
assertEquals(fetchSize, stmt.getFetchSize());
}
}

Expand All @@ -83,10 +85,7 @@ public void testSetFetchSizeZeroWithDefault() throws SQLException {
final Iface iface = mock(Iface.class);
final TSessionHandle handle = mock(TSessionHandle.class);

// No hint specified and no default value passed in through the constructor,
// so it falls-back to a value 1000
try (HiveStatement stmt = new HiveStatement(connection, iface, handle, false, 0, 10)) {
stmt.setFetchSize(0);
try (HiveStatement stmt = new HiveStatement(connection, iface, handle, false, 10)) {
assertEquals(10, stmt.getFetchSize());
}
}
Expand All @@ -106,7 +105,7 @@ public void testSetFetchSizeJdbcProperty() throws SQLException {
final Iface iface = mock(Iface.class);
final TSessionHandle handle = mock(TSessionHandle.class);

try (HiveStatement stmt = new HiveStatement(connection, iface, handle, false, 4, 1000)) {
try (HiveStatement stmt = new HiveStatement(connection, iface, handle, false, 4)) {
assertEquals(4, stmt.getFetchSize());
}
}
Expand All @@ -121,7 +120,7 @@ public void testSetFetchSizeJdbcProperty() throws SQLException {
*/
@Test(expected = SQLException.class)
public void testSetFetchSizeNegativeValue() throws SQLException {
final HiveConnection connection = mock(HiveConnection.class);
final HiveConnection connection = mock(HiveConnection.class, withSettings().useConstructor());
final Iface iface = mock(Iface.class);
final TSessionHandle handle = mock(TSessionHandle.class);

Expand All @@ -132,7 +131,7 @@ public void testSetFetchSizeNegativeValue() throws SQLException {

@Test(expected = SQLFeatureNotSupportedException.class)
public void testaddBatch() throws SQLException {
final HiveConnection connection = mock(HiveConnection.class);
final HiveConnection connection = mock(HiveConnection.class, withSettings().useConstructor());
final Iface iface = mock(Iface.class);
final TSessionHandle handle = mock(TSessionHandle.class);

Expand Down

0 comments on commit e8ed23e

Please sign in to comment.