Skip to content

Commit

Permalink
feat(webflux): implement remote IP header appender for command reques…
Browse files Browse the repository at this point in the history
…ts (#1112)

* feat(webflux): implement remote IP header appender for command requests

- Add CommandRequestRemoteIpHeaderAppender to handle X-Forwarded-For header
- Implement logic to resolve remote IP from request headers
- Update WebFluxAutoConfiguration to include new appender
- Add unit tests for CommandRequestRemoteIpHeaderAppender

* test(wow-webflux): add tests for CommandRequestExtendHeaderAppender and DefaultCommandMessageParser

- Add unit tests for CommandRequestExtendHeaderAppender
- Add test cases for CommandRequestUserAgentHeaderAppender
- Remove unnecessary test file for DefaultCommandMessageParser

* refactor(wow-webflux): optimize remote IP resolution and logging

- Remove unnecessary logging for multiple X-Forwarded-For headers
- Simplify code by using firstHeader instead of header list
- Add test case for empty X-Forwarded-For header
- Update existing test cases to use new header resolution approach

* test(webflux): add test case for disabling command request appender

- Add test case to verify that CommandRequestUserAgentHeaderAppender and CommandRequestRemote
  • Loading branch information
Ahoo-Wang authored Jan 9, 2025
1 parent df670d2 commit 50f0c43
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 43 deletions.
1 change: 1 addition & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ import me.ahoo.wow.webflux.route.bi.GenerateBIScriptHandlerFunctionFactory
import me.ahoo.wow.webflux.route.command.CommandFacadeHandlerFunctionFactory
import me.ahoo.wow.webflux.route.command.CommandHandlerFunctionFactory
import me.ahoo.wow.webflux.route.command.CommandMessageParser
import me.ahoo.wow.webflux.route.command.CommandRequestExtendHeaderAppender
import me.ahoo.wow.webflux.route.command.CommandRequestHeaderAppender
import me.ahoo.wow.webflux.route.command.CommandRequestUserAgentHeaderAppender
import me.ahoo.wow.webflux.route.command.DEFAULT_TIME_OUT
import me.ahoo.wow.webflux.route.command.DefaultCommandMessageParser
import me.ahoo.wow.webflux.route.command.appender.CommandRequestExtendHeaderAppender
import me.ahoo.wow.webflux.route.command.appender.CommandRequestHeaderAppender
import me.ahoo.wow.webflux.route.command.appender.CommandRequestRemoteIpHeaderAppender
import me.ahoo.wow.webflux.route.command.appender.CommandRequestUserAgentHeaderAppender
import me.ahoo.wow.webflux.route.event.CountEventStreamHandlerFunctionFactory
import me.ahoo.wow.webflux.route.event.EventCompensateHandlerFunctionFactory
import me.ahoo.wow.webflux.route.event.ListQueryEventStreamHandlerFunctionFactory
Expand Down Expand Up @@ -153,14 +154,24 @@ class WebFluxAutoConfiguration {

@Bean
@ConditionalOnProperty(
value = ["${WebFluxProperties.PREFIX}.command.request.appender.user$ENABLED_SUFFIX_KEY"],
value = ["${WebFluxProperties.COMMAND_REQUEST_APPENDER_PREFIX}.agent$ENABLED_SUFFIX_KEY"],
matchIfMissing = true,
havingValue = "true"
)
fun commandRequestUserAgentHeaderAppender(): CommandRequestUserAgentHeaderAppender {
return CommandRequestUserAgentHeaderAppender
}

@Bean
@ConditionalOnProperty(
value = ["${WebFluxProperties.COMMAND_REQUEST_APPENDER_PREFIX}.ip$ENABLED_SUFFIX_KEY"],
matchIfMissing = true,
havingValue = "true"
)
fun commandRequestRemoteIpHeaderAppender(): CommandRequestRemoteIpHeaderAppender {
return CommandRequestRemoteIpHeaderAppender
}

@Bean
@ConditionalOnMissingBean
fun commandMessageParser(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class WebFluxProperties(
) : EnabledCapable {
companion object {
const val PREFIX = "${Wow.WOW_PREFIX}webflux"
const val COMMAND_REQUEST_APPENDER_PREFIX = "$PREFIX.command.request.appender"
const val GLOBAL_ERROR_ENABLED = "$PREFIX.global-error$ENABLED_SUFFIX_KEY"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory
import me.ahoo.wow.modeling.state.StateAggregateFactory
import me.ahoo.wow.query.event.filter.EventStreamQueryHandler
import me.ahoo.wow.query.snapshot.filter.SnapshotQueryHandler
import me.ahoo.wow.spring.boot.starter.ENABLED_SUFFIX_KEY
import me.ahoo.wow.spring.boot.starter.command.CommandAutoConfiguration
import me.ahoo.wow.spring.boot.starter.command.CommandGatewayAutoConfiguration
import me.ahoo.wow.spring.boot.starter.enableWow
Expand All @@ -42,6 +43,8 @@ import me.ahoo.wow.spring.boot.starter.webflux.WebFluxProperties.Companion.GLOBA
import me.ahoo.wow.test.SagaVerifier
import me.ahoo.wow.webflux.exception.GlobalExceptionHandler
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
import me.ahoo.wow.webflux.route.command.appender.CommandRequestRemoteIpHeaderAppender
import me.ahoo.wow.webflux.route.command.appender.CommandRequestUserAgentHeaderAppender
import org.assertj.core.api.AssertionsForInterfaceTypes
import org.junit.jupiter.api.Test
import org.springframework.boot.test.context.assertj.AssertableApplicationContext
Expand Down Expand Up @@ -81,6 +84,40 @@ internal class WebFluxAutoConfigurationTest {
}
}

@Test
fun contextLoadsDisableAgentAppender() {
contextRunner
.enableWow()
.withPropertyValues(
"${WebFluxProperties.COMMAND_REQUEST_APPENDER_PREFIX}.agent$ENABLED_SUFFIX_KEY=false",
"${WebFluxProperties.COMMAND_REQUEST_APPENDER_PREFIX}.ip$ENABLED_SUFFIX_KEY=false"
)
.withBean(CommandWaitNotifier::class.java, { mockk() })
.withBean(CommandGateway::class.java, { SagaVerifier.defaultCommandGateway() })
.withBean(StateAggregateFactory::class.java, { ConstructorStateAggregateFactory })
.withBean(SnapshotRepository::class.java, { NoOpSnapshotRepository })
.withBean(EventStore::class.java, { InMemoryEventStore() })
.withBean(DomainEventBus::class.java, { InMemoryDomainEventBus() })
.withBean(StateEventCompensator::class.java, { mockk() })
.withBean(EventCompensateSupporter::class.java, { mockk() })
.withBean(SnapshotQueryHandler::class.java, { spyk<SnapshotQueryHandler>() })
.withBean(EventStreamQueryHandler::class.java, { spyk<EventStreamQueryHandler>() })
.withBean(HostAddressSupplier::class.java, { LocalHostAddressSupplier.INSTANCE })
.withUserConfiguration(
CommandAutoConfiguration::class.java,
CommandGatewayAutoConfiguration::class.java,
EventSourcingAutoConfiguration::class.java,
AggregateAutoConfiguration::class.java,
OpenAPIAutoConfiguration::class.java,
WebFluxAutoConfiguration::class.java,
)
.run { context: AssertableApplicationContext ->
AssertionsForInterfaceTypes.assertThat(context)
.doesNotHaveBean(CommandRequestUserAgentHeaderAppender::class.java)
.doesNotHaveBean(CommandRequestRemoteIpHeaderAppender::class.java)
}
}

@Test
fun contextLoadsWithKafkaProperties() {
contextRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import me.ahoo.wow.messaging.withLocalFirst
import me.ahoo.wow.modeling.matedata.AggregateMetadata
import me.ahoo.wow.openapi.command.CommandRequestHeaders.AGGREGATE_VERSION
import me.ahoo.wow.openapi.command.CommandRequestHeaders.REQUEST_ID
import me.ahoo.wow.webflux.route.command.appender.CommandRequestHeaderAppender
import org.springframework.web.reactive.function.server.ServerRequest
import reactor.core.publisher.Mono

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command.appender

import me.ahoo.wow.api.messaging.Header
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import org.springframework.web.reactive.function.server.ServerRequest

object CommandRequestExtendHeaderAppender : CommandRequestHeaderAppender {
override fun append(request: ServerRequest, header: Header) {
val extendedHeaders = request.headers().asHttpHeaders()
.filter { (key, _) -> key.startsWith(CommandRequestHeaders.COMMAND_HEADER_X_PREFIX) }
.map { (key, value) ->
key.substring(CommandRequestHeaders.COMMAND_HEADER_X_PREFIX.length) to value.firstOrNull<String>().orEmpty()
}.toMap<String, String>()
if (extendedHeaders.isEmpty()) {
return
}
header.with(extendedHeaders)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command.appender

import me.ahoo.wow.api.messaging.Header
import org.springframework.web.reactive.function.server.ServerRequest

interface CommandRequestHeaderAppender {
fun append(request: ServerRequest, header: Header)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command.appender

import me.ahoo.wow.api.messaging.Header
import me.ahoo.wow.messaging.propagation.CommandRequestHeaderPropagator.Companion.withRemoteIp
import org.springframework.web.reactive.function.server.ServerRequest
import kotlin.jvm.optionals.getOrNull

object CommandRequestRemoteIpHeaderAppender : CommandRequestHeaderAppender {
const val X_FORWARDED_FOR = "X-Forwarded-For"
const val DELIMITER = ','

override fun append(request: ServerRequest, header: Header) {
resolveRemoteIp(request)?.let {
header.withRemoteIp(it)
}
}

private fun getRemoteIp(request: ServerRequest): String? {
return request.remoteAddress().getOrNull()?.hostName
}

private fun resolveRemoteIp(request: ServerRequest): String? {
val xForwardedHeaderValue = request.headers().firstHeader(X_FORWARDED_FOR)
if (xForwardedHeaderValue.isNullOrBlank()) {
return getRemoteIp(request)
}

val xForwardedValues = xForwardedHeaderValue
.split(DELIMITER)
.filter { it.isNotBlank() }
.reversed()
if (xForwardedValues.isEmpty()) {
return getRemoteIp(request)
}
val index = xForwardedValues.size.coerceAtMost(Int.MAX_VALUE) - 1
return xForwardedValues[index]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,17 @@
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command
package me.ahoo.wow.webflux.route.command.appender

import me.ahoo.wow.api.messaging.Header
import me.ahoo.wow.messaging.propagation.CommandRequestHeaderPropagator.Companion.withRemoteIp
import me.ahoo.wow.messaging.propagation.CommandRequestHeaderPropagator.Companion.withUserAgent
import me.ahoo.wow.openapi.command.CommandRequestHeaders.COMMAND_HEADER_X_PREFIX
import org.springframework.http.HttpHeaders
import org.springframework.web.reactive.function.server.ServerRequest
import kotlin.jvm.optionals.getOrNull

interface CommandRequestHeaderAppender {
fun append(request: ServerRequest, header: Header)
}

object CommandRequestExtendHeaderAppender : CommandRequestHeaderAppender {
override fun append(request: ServerRequest, header: Header) {
val extendedHeaders = request.headers().asHttpHeaders()
.filter { (key, _) -> key.startsWith(COMMAND_HEADER_X_PREFIX) }
.map { (key, value) ->
key.substring(COMMAND_HEADER_X_PREFIX.length) to value.firstOrNull<String>().orEmpty()
}.toMap<String, String>()
if (extendedHeaders.isEmpty()) {
return
}
header.with(extendedHeaders)
}
}

object CommandRequestUserAgentHeaderAppender : CommandRequestHeaderAppender {
override fun append(request: ServerRequest, header: Header) {
request.headers().firstHeader(HttpHeaders.USER_AGENT)?.let {
header.withUserAgent(it)
}
request.remoteAddress().getOrNull()?.hostName?.let {
header.withRemoteIp(it)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command

import io.mockk.every
Expand All @@ -11,8 +24,9 @@ import me.ahoo.wow.openapi.command.CommandRequestHeaders
import me.ahoo.wow.serialization.MessageRecords
import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA
import me.ahoo.wow.tck.mock.MockCreateAggregate
import org.hamcrest.CoreMatchers.equalTo
import org.hamcrest.MatcherAssert.*
import me.ahoo.wow.webflux.route.command.appender.CommandRequestExtendHeaderAppender
import org.hamcrest.CoreMatchers
import org.hamcrest.MatcherAssert
import org.junit.jupiter.api.Test
import org.springframework.http.HttpHeaders
import org.springframework.util.MultiValueMap
Expand Down Expand Up @@ -90,7 +104,7 @@ class DefaultCommandMessageParserTest {
request
).test()
.consumeNextWith {
assertThat(it.header[headerKey], equalTo(value))
MatcherAssert.assertThat(it.header[headerKey], CoreMatchers.equalTo(value))
}
.verifyComplete()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command.appender

import io.mockk.every
import io.mockk.mockk
import me.ahoo.wow.messaging.DefaultHeader
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import org.hamcrest.CoreMatchers.equalTo
import org.hamcrest.MatcherAssert.*
import org.junit.jupiter.api.Test
import org.springframework.http.HttpHeaders
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.server.ServerRequest

class CommandRequestExtendHeaderAppenderTest {
@Test
fun append() {
val headerKey = "app"
val key = CommandRequestHeaders.COMMAND_HEADER_X_PREFIX + headerKey
val value = "oms"
val request = mockk<ServerRequest> {
every { headers().asHttpHeaders() } returns HttpHeaders(
MultiValueMap.fromSingleValue<String, String>(
mapOf(
key to value
)
)
)
}
val commandHeader = DefaultHeader.empty()
CommandRequestExtendHeaderAppender.append(request, commandHeader)
assertThat(commandHeader[headerKey], equalTo(value))
}

@Test
fun appendEmpty() {
val request = mockk<ServerRequest> {
every { headers().asHttpHeaders() } returns HttpHeaders()
}
val commandHeader = DefaultHeader.empty()
CommandRequestExtendHeaderAppender.append(request, commandHeader)
assertThat(commandHeader.isEmpty(), equalTo(true))
}
}
Loading

0 comments on commit 50f0c43

Please sign in to comment.