Skip to content

Commit

Permalink
fix oblogproxy-ce build script and add java test case (#19)
Browse files Browse the repository at this point in the history
* add java test case for oblogproxy-ce

* add boot success message

* fix java test case

* modify test action trigger rule
  • Loading branch information
whhe authored Jun 26, 2024
1 parent 2770210 commit 9b638d0
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 13 deletions.
125 changes: 125 additions & 0 deletions .github/workflows/test-oblogproxy-ce.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
name: test oblogproxy-ce

on:
pull_request:
paths:
- '!**/*.md'
- '.github/workflows/**-oblogproxy-ce.yml'
- '.github/workflows/**-oceanbase-ce.yml'
- 'oblogproxy-ce/**'
- 'oceanbase-ce/**'
- 'test/**'

concurrency:
group: test-oblogproxy-ce-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
build-oceanbase-ce:
uses: ./.github/workflows/build-oceanbase-ce.yml
with:
cache_key: test-oblogproxy-ce_oceanbase-ce
image_file: oceanbase-ce.tar
version: 4.2.1.6-106000012024042515

build-oblogproxy-ce:
runs-on: ubuntu-latest
steps:
- name: Print environment variables
run: printenv

- name: Check out repository code
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build oblogproxy-ce image
uses: docker/build-push-action@v6
with:
context: ./oblogproxy-ce
platforms: linux/amd64
file: ./oblogproxy-ce/Dockerfile
push: false
load: true
tags: oblogproxy-ce
build-args: |
VERSION=2.0.2-100000012024060321
- name: Export Docker image
run: docker save -o oblogproxy-ce.tar oblogproxy-ce

- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: test-oblogproxy-ce_oblogproxy-ce
path: oblogproxy-ce.tar

test-oblogproxy-ce:
runs-on: ubuntu-latest
needs: [ build-oceanbase-ce, build-oblogproxy-ce ]
steps:
- name: Download artifact
uses: actions/download-artifact@v4
with:
pattern: test-oblogproxy-ce_**
path: /tmp

- name: Load Docker image
run: |
docker load -i /tmp/test-oblogproxy-ce_oceanbase-ce/oceanbase-ce.tar
docker load -i /tmp/test-oblogproxy-ce_oblogproxy-ce/oblogproxy-ce.tar
sudo rm -rf /tmp/*
- name: Start oceanbase-ce container
uses: oceanbase/setup-oceanbase-ce@v1
with:
image_name: oceanbase-ce
container_name: oceanbase-ce
sql_port: 12881
rpc_port: 12882
mode: mini
sys_root_password: 123456
tenant_root_password: 654321

- name: Start oblogproxy-ce container
run: docker run --name oblogproxy-ce -e OB_SYS_USERNAME=root -e OB_SYS_PASSWORD=123456 -p 12983:2983 -d oblogproxy-ce

- name: Set container IP
id: set_container_ip
run: |
observer_ip=$(docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' oceanbase-ce)
oblogproxy_ip=$(docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' oblogproxy-ce)
echo "Container 'oceanbase-ce' IP is $observer_ip."
echo "Container 'oblogproxy-ce' IP is $oblogproxy_ip."
echo "observer_ip=$observer_ip" >> $GITHUB_OUTPUT
echo "oblogproxy_ip=$oblogproxy_ip" >> $GITHUB_OUTPUT
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Java
uses: actions/setup-java@v4
with:
java-version: '8'
distribution: 'zulu'

- name: Build test project
run: |
cd test
mvn install -DskipTests=true
- name: Run test methods
env:
observer_ip: ${{ steps.set_container_ip.outputs.observer_ip }}
oblogproxy_ip: ${{ steps.set_container_ip.outputs.oblogproxy_ip }}
oblogproxy_port: 12983
username: root@test
password: 654321
run: |
cd test
mvn verify -Dtest=LogProxyCETest -DfailIfNoTests=false
9 changes: 4 additions & 5 deletions .github/workflows/test-oceanbase-ce.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
name: test oceanbase-ce

on:
push:
branches: [ main ]
pull_request:
paths:
- '!**/*.md'
- '.github/workflows/**-oceanbase-ce.yml'
- 'oceanbase-ce/**'
- 'test/**'
Expand All @@ -17,15 +16,15 @@ jobs:
build:
uses: ./.github/workflows/build-oceanbase-ce.yml
with:
cache_key: oceanbase-ce
cache_key: test-oceanbase-ce
image_file: oceanbase-ce.tar
version: 4.2.3.1-101000032024061316

test-slim:
needs: build
uses: ./.github/workflows/java-test-oceanbase-ce.yml
with:
cache_key: oceanbase-ce
cache_key: test-oceanbase-ce
image_file: oceanbase-ce.tar
mode: slim
port: 1234
Expand All @@ -38,7 +37,7 @@ jobs:
needs: build
uses: ./.github/workflows/java-test-oceanbase-ce.yml
with:
cache_key: oceanbase-ce
cache_key: test-oceanbase-ce
image_file: oceanbase-ce.tar
cluster_name: github-action
mode: mini
Expand Down
5 changes: 2 additions & 3 deletions oblogproxy-ce/start.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#!/bin/bash

if [[ -n ${OB_SYS_USERNAME} && -n ${OB_SYS_PASSWORD} ]]; then
echo "y" | /usr/local/oblogproxy/run.sh config_sys ${OB_SYS_USERNAME} ${OB_ROOT_PASSWORD}
echo "y" | /usr/local/oblogproxy/run.sh config_sys ${OB_SYS_USERNAME} ${OB_SYS_PASSWORD}
fi

rm -rf /usr/local/oblogproxy/run/*
/usr/local/oblogproxy/run.sh start
exec /sbin/init
echo "boot success!" && exec /sbin/init
11 changes: 11 additions & 0 deletions test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oblogclient-logproxy</artifactId>
<version>1.1.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
204 changes: 204 additions & 0 deletions test/src/test/java/com/oceanbase/test/LogProxyCETest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.test;

import com.oceanbase.clogproxy.client.LogProxyClient;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.oms.logmessage.DataMessage;
import com.oceanbase.oms.logmessage.LogMessage;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class LogProxyCETest {

private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(LogProxyCETest.class);

static Stream<Arguments> testLogProxyCEArgs() {
// non-null env vars
String observerIP = Utils.getNonEmptyEnv("observer_ip");
String logProxyIP = Utils.getNonEmptyEnv("oblogproxy_ip");
String logProxyPort = Utils.getNonEmptyEnv("oblogproxy_port");
String username = Utils.getNonEmptyEnv("username");
String password = Utils.getNonEmptyEnv("password");

return Stream.of(
Arguments.of(observerIP, logProxyIP, "2983", username, password),
Arguments.of(observerIP, "127.0.0.1", logProxyPort, username, password));
}

@ParameterizedTest
@MethodSource("testLogProxyCEArgs")
public void testLogProxyCE(
String observerIP,
String logProxyIP,
String logProxyPort,
String username,
String password)
throws InterruptedException {

LOG.info(
"Testing with args: [observerIP: {}, logProxyIP: {}, logProxyPort: {}, username: {}, password: {}]",
observerIP,
logProxyIP,
logProxyPort,
username,
password);

ObReaderConfig obReaderConfig = new ObReaderConfig();

Properties props = new Properties();
props.setProperty("user", username);
props.setProperty("password", password);

Driver driver = Utils.getDriver(false);
try (Connection conn = driver.connect("jdbc:mysql://" + observerIP + ":2881/test", props)) {
obReaderConfig.setRsList(Utils.getRSList(conn));
obReaderConfig.setTableWhiteList(Utils.getTenantName(conn) + ".*.*");
} catch (SQLException e) {
Assertions.fail(e);
}

obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(0L);
obReaderConfig.setWorkingMode("memory");

ClientConf clientConf =
ClientConf.builder()
.transferQueueSize(1000)
.connectTimeoutMs((int) Duration.ofSeconds(30).toMillis())
.maxReconnectTimes(0)
.ignoreUnknownRecordType(true)
.build();

LogProxyClient client =
new LogProxyClient(
logProxyIP, Integer.parseInt(logProxyPort), obReaderConfig, clientConf);

BlockingQueue<LogMessage> messageQueue = new LinkedBlockingQueue<>(2);
AtomicBoolean started = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);

client.addListener(
new RecordListener() {
@Override
public void notify(LogMessage message) {
switch (message.getOpt()) {
case HEARTBEAT:
LOG.info(
"Received heartbeat with checkpoint {}",
message.getCheckpoint());
if (started.compareAndSet(false, true)) {
latch.countDown();
}
break;
case BEGIN:
LOG.info("Received transaction begin: {}", message);
break;
case COMMIT:
LOG.info("Received transaction commit: {}", message);
break;
case INSERT:
case UPDATE:
case DELETE:
case DDL:
try {
messageQueue.put(message);
} catch (InterruptedException e) {
throw new RuntimeException("Failed to add message to queue", e);
}
break;
default:
throw new IllegalArgumentException(
"Unsupported log message type: " + message.getOpt());
}
}

@Override
public void onException(LogProxyClientException e) {
LOG.error(e.toString());
}
});

client.start();

if (!latch.await(30, TimeUnit.SECONDS)) {
Assertions.fail("Timeout to receive heartbeat message");
}

String ddl = "CREATE TABLE t_product (id INT(10) PRIMARY KEY, name VARCHAR(20))";
try (Connection conn = driver.connect("jdbc:mysql://" + observerIP + ":2881/test", props);
Statement statement = conn.createStatement()) {
statement.execute(ddl);
statement.execute("INSERT INTO t_product VALUES (1, 'meat')");
} catch (SQLException e) {
Assertions.fail(e);
}

while (messageQueue.size() < 2) {
Thread.sleep(1000);
}

LogMessage first = messageQueue.take();
Assertions.assertEquals(DataMessage.Record.Type.DDL, first.getOpt());
Assertions.assertEquals(ddl, first.getFieldList().get(0).getValue().toString());

LogMessage second = messageQueue.take();
Assertions.assertEquals(DataMessage.Record.Type.INSERT, second.getOpt());
Assertions.assertEquals("t_product", second.getTableName());

Map<String, String> fieldMap =
second.getFieldList().stream()
.filter(f -> !f.isPrev())
.collect(
Collectors.toMap(
DataMessage.Record.Field::getFieldname,
f -> f.getValue().toString()));
Assertions.assertEquals(2, fieldMap.size());
Assertions.assertEquals("1", fieldMap.get("id"));
Assertions.assertEquals("meat", fieldMap.get("name"));

try (Connection conn = driver.connect("jdbc:mysql://" + observerIP + ":2881/test", props);
Statement statement = conn.createStatement()) {
statement.execute("DROP TABLE t_product");
} catch (SQLException e) {
Assertions.fail(e);
}

client.stop();
}
}
Loading

0 comments on commit 9b638d0

Please sign in to comment.