Skip to content

Commit

Permalink
Add multi node setup test suites.
Browse files Browse the repository at this point in the history
This adds two new test suites, `threeNodes` and `threeNodesAlwaysSuspending`. The StopRuntime, KillRuntime and NonDeterminismErrors tests are disabled for now.
  • Loading branch information
slinkydeveloper committed Nov 6, 2024
1 parent 225fdd0 commit fe4ec6c
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 100 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ java -jar restate-sdk-test-suite.jar debug \

Please note, some tests requiring to kill/stop the service deployment won't work with the `debug` command.

## Building this tool

```shell
./gradlew shadowJar
```

## Releasing this tool

Just push a new git tag:
Expand Down
31 changes: 10 additions & 21 deletions src/main/kotlin/dev/restate/sdktesting/infra/ContainerLogger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,36 @@ internal class ContainerLogger(
) : Consumer<OutputFrame> {

private var startCount = 0
private var stdoutStream: BufferedWriter? = null
private var stderrStream: BufferedWriter? = null
private var logStream: BufferedWriter? = null

override fun accept(frame: OutputFrame) {
when (frame.type) {
OutputFrame.OutputType.STDOUT -> {
resolveStdoutStream().write(frame.utf8String)
resolveStream().write(frame.utf8String)
}
OutputFrame.OutputType.STDERR -> {
resolveStderrStream().write(frame.utf8String)
resolveStream().write(frame.utf8String)
}
else -> {
stdoutStream?.close()
stderrStream?.close()
stdoutStream = null
stderrStream = null
logStream?.close()
logStream = null
startCount++
}
}
}

private fun resolveStdoutStream(): BufferedWriter {
if (stdoutStream == null) {
stdoutStream = newStream(testReportDirectory, loggerName, "stdout")
private fun resolveStream(): BufferedWriter {
if (logStream == null) {
logStream = newStream(testReportDirectory, loggerName)
}
return stdoutStream!!
}

private fun resolveStderrStream(): BufferedWriter {
if (stderrStream == null) {
stderrStream = newStream(testReportDirectory, loggerName, "stderr")
}
return stderrStream!!
return logStream!!
}

private fun newStream(
testReportDirectory: String,
loggerName: String,
type: String
): BufferedWriter {
val path = Path.of(testReportDirectory, "${loggerName}_${startCount}_${type}.log")
val path = Path.of(testReportDirectory, "${loggerName}_${startCount}.log")
val fileExists = Files.exists(path)

val writer =
Expand Down
164 changes: 120 additions & 44 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import com.fasterxml.jackson.dataformat.toml.TomlFactory
import com.github.dockerjava.api.command.InspectContainerResponse
import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema
import java.io.File
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlin.use
import org.apache.logging.log4j.CloseableThreadContext
import org.apache.logging.log4j.LogManager
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder
import org.testcontainers.containers.BindMode
Expand All @@ -29,10 +32,12 @@ import org.testcontainers.utility.DockerImageName

class RestateContainer(
config: RestateDeployerConfig,
val hostname: String,
network: Network,
envs: Map<String, String>,
configSchema: RestateConfigSchema?,
copyToContainer: List<Pair<String, Transferable>>
copyToContainer: List<Pair<String, Transferable>>,
enableLocalPortForward: Boolean = true,
) : GenericContainer<RestateContainer>(DockerImageName.parse(config.restateContainerImage)) {
companion object {
private val LOG = LogManager.getLogger(RestateContainer::class.java)
Expand All @@ -45,78 +50,149 @@ class RestateContainer(
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(100, TimeUnit.MILLISECONDS)
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build())
.withStartupTimeout(20.seconds.toJavaDuration()))
.build()))
.withStrategy(
Wait.forHttp("/health")
.forPort(RUNTIME_ADMIN_ENDPOINT_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(100, TimeUnit.MILLISECONDS)
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build())
.withStartupTimeout(20.seconds.toJavaDuration()))
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

fun bootstrapRestateCluster(
config: RestateDeployerConfig,
network: Network,
envs: Map<String, String>,
configSchema: RestateConfigSchema?,
copyToContainer: List<Pair<String, Transferable>>,
nodes: Int
): List<RestateContainer> {
if (nodes == 1) {
return listOf(
RestateContainer(config, RESTATE_RUNTIME, network, envs, configSchema, copyToContainer))
} else {
val clusterId = UUID.randomUUID().toString()
val leaderEnvs =
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "true",
"RESTATE_ROLES" to "[worker,log-server,admin,metadata-store]",
)
val followerEnvs =
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ROLES" to "[worker,admin,log-server]",
"RESTATE_METADATA_STORE_CLIENT__ADDRESS" to "http://$RESTATE_RUNTIME:5123")

return listOf(
RestateContainer(
config,
// Leader will just have the default hostname as usual, this makes sure containers
// port injection annotations still work.
RESTATE_RUNTIME,
network,
envs +
leaderEnvs +
mapOf(
"RESTATE_ADVERTISED_ADDRESS" to
"http://$RESTATE_RUNTIME:$RUNTIME_NODE_PORT"),
configSchema,
copyToContainer)) +
(1.rangeUntil(config.restateNodes)).map {
RestateContainer(
config,
"$RESTATE_RUNTIME-$it",
network,
envs +
followerEnvs +
mapOf(
"RESTATE_ADVERTISED_ADDRESS" to
"http://$RESTATE_RUNTIME-$it:$RUNTIME_NODE_PORT"),
configSchema,
copyToContainer,
// Only the leader gets the privilege of local port forwarding
enableLocalPortForward = false)
}
}
}
}

init {
LOG.debug("Using runtime image '{}'", config.restateContainerImage)
CloseableThreadContext.put("containerHostname", hostname).use {
withImagePullPolicy(config.imagePullPolicy.toTestContainersImagePullPolicy())

withImagePullPolicy(config.imagePullPolicy.toTestContainersImagePullPolicy())
withEnv(envs)
// These envs should not be overriden by envs
withEnv("RESTATE_ADMIN__BIND_ADDRESS", "0.0.0.0:$RUNTIME_ADMIN_ENDPOINT_PORT")
withEnv("RESTATE_INGRESS__BIND_ADDRESS", "0.0.0.0:$RUNTIME_INGRESS_ENDPOINT_PORT")

withEnv(envs)
// These envs should not be overriden by envs
withEnv("RESTATE_ADMIN__BIND_ADDRESS", "0.0.0.0:$RUNTIME_ADMIN_ENDPOINT_PORT")
withEnv("RESTATE_INGRESS__BIND_ADDRESS", "0.0.0.0:$RUNTIME_INGRESS_ENDPOINT_PORT")
this.network = network
this.networkAliases = arrayListOf(hostname)
withCreateContainerCmdModifier { it.withHostName(hostname) }

this.network = network
this.networkAliases = arrayListOf(RESTATE_RUNTIME)
withStartupAttempts(3) // For podman
waitingFor(WAIT_STARTUP_STRATEGY)
withStartupAttempts(3) // For podman
waitingFor(WAIT_STARTUP_STRATEGY)

if (config.stateDirectoryMount != null) {
val stateDir = File(config.stateDirectoryMount)
stateDir.mkdirs()
if (config.stateDirectoryMount != null) {
val stateDir = File(config.stateDirectoryMount)
stateDir.mkdirs()

LOG.debug("Mounting state directory to '{}'", stateDir.toPath())
addFileSystemBind(stateDir.toString(), "/state", BindMode.READ_WRITE, SelinuxContext.SINGLE)
}
withEnv("RESTATE_BASE_DIR", "/state")
LOG.debug("Mounting state directory to '{}'", stateDir.toPath())
addFileSystemBind(stateDir.toString(), "/state", BindMode.READ_WRITE, SelinuxContext.SINGLE)
}
withEnv("RESTATE_BASE_DIR", "/state")

if (configSchema != null) {
withCopyToContainer(
Transferable.of(TOML_MAPPER.writeValueAsBytes(configSchema)), "/config.toml")
withEnv("RESTATE_CONFIG", "/config.toml")
}
if (configSchema != null) {
withCopyToContainer(
Transferable.of(TOML_MAPPER.writeValueAsBytes(configSchema)), "/config.toml")
withEnv("RESTATE_CONFIG", "/config.toml")
}

for (file in copyToContainer) {
withCopyToContainer(file.second, file.first)
}
for (file in copyToContainer) {
withCopyToContainer(file.second, file.first)
}

if (config.localAdminPort != null) {
LOG.info("Going to expose Admin port on localhost:{}", config.localAdminPort)
super.addFixedExposedPort(config.localAdminPort, RUNTIME_ADMIN_ENDPOINT_PORT)
} else {
addExposedPort(RUNTIME_ADMIN_ENDPOINT_PORT)
}
if (config.localIngressPort != null) {
LOG.info("Going to expose Admin port on localhost:{}", config.localIngressPort)
super.addFixedExposedPort(config.localIngressPort, RUNTIME_INGRESS_ENDPOINT_PORT)
} else {
addExposedPort(RUNTIME_INGRESS_ENDPOINT_PORT)
if (enableLocalPortForward && config.localAdminPort != null) {
LOG.info("Going to expose Admin port on 'localhost:{}'", config.localAdminPort)
super.addFixedExposedPort(config.localAdminPort, RUNTIME_ADMIN_ENDPOINT_PORT)
} else {
addExposedPort(RUNTIME_ADMIN_ENDPOINT_PORT)
}
if (enableLocalPortForward && config.localIngressPort != null) {
LOG.info("Going to expose Admin port on 'localhost:{}'", config.localIngressPort)
super.addFixedExposedPort(config.localIngressPort, RUNTIME_INGRESS_ENDPOINT_PORT)
} else {
addExposedPort(RUNTIME_INGRESS_ENDPOINT_PORT)
}
if (enableLocalPortForward && config.localNodePort != null) {
LOG.info("Going to expose node port on 'localhost:{}'", config.localNodePort)
super.addFixedExposedPort(config.localNodePort, RUNTIME_NODE_PORT)
} else {
addExposedPort(RUNTIME_NODE_PORT)
}
}
}

fun configureLogger(testReportDir: String): RestateContainer {
this.withLogConsumer(ContainerLogger(testReportDir, "restate-runtime"))
this.withLogConsumer(ContainerLogger(testReportDir, hostname))
return this
}

fun waitStartup() {
WAIT_STARTUP_STRATEGY.waitUntilReady(this)
}

fun dumpConfiguration() {
check(isRunning) { "The container is not running, can't dump configuration" }
dockerClient.killContainerCmd(containerId).withSignal("SIGUSR1").exec()
}

override fun getContainerInfo(): InspectContainerResponse {
// We override container info to avoid getting outdated info when restarted
val containerId = this.containerId
Expand Down
Loading

0 comments on commit fe4ec6c

Please sign in to comment.