diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..0b9c73ac --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +text=auto eol=lf \ No newline at end of file diff --git a/.gitignore b/.gitignore index 910aecda..2c7d0b0b 100644 --- a/.gitignore +++ b/.gitignore @@ -253,3 +253,7 @@ generated/ .settings # End of https://www.gitignore.io/api/node,java,macos,linux,cmake,gradle,intellij+all +.flattened-pom.xml + + +.factorypath diff --git a/README.md b/README.md index 880cae8f..ad28fadc 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,87 @@ +# Updated RSocket IPC Metadata Handling + +This is intended to be used as an add on to the IPC module of rsocket/rsocket-rpc-java found here: https://github.com/rsocket/rsocket-rpc-java + +The rsocket/rsocket-rpc-java project uses an outdated version of RSocket and doesn't work well with CompositeMetadata. It uses custom parsing and encoding of metadata content to route messages. + +This project provides drop in (assuming RSocket RC1.7 and up) replacement MetadataDecoder and MetadataEncoder classes. + +The two classes at work are MetadataDecoderLFP and MetadataEncoderLFP. They use MetadataWriter and MetadataReader classes to allow for custom serialization of metadata content. + +Out of the box they support the service/method/trace requirements of rsocket/rsocket-rpc-java but also allow for custom interceptors. + +For example, we can use the following code to require a password on all requests: + +```java +MetadataDecoderLFP decoder = new MetadataDecoderLFP(); +RequestHandlingRSocket requestHandler = new RequestHandlingRSocket(decoder); +{// start server + SocketAcceptor socketAcceptor = (setup, client) -> Mono.just(requestHandler); + RSocketServer.create(socketAcceptor).interceptors(ir -> { + }).errorConsumer(t -> { + java.util.logging.Logger.getLogger("[server]").log(Level.SEVERE, "uncaught error", t); + }).bind(TcpServerTransport.create("localhost", 7000)).block(); +} +decoder.addInterceptor(reader -> { + boolean match = reader.containsString(MimeTypes.create("password"), "thisIsACoolPassWord!"); + if (!match) + throw new IllegalArgumentException("not authorized"); +}); +``` +If we try to access the server, we will receive the following: + +``` +SEVERE: uncaught error +java.lang.IllegalArgumentException: not authorized + at com.lfp.rsocket.ipc.metadata.IntegrationTest.lambda$13(IntegrationTest.java:116) +``` + +We can then modify the client to add the password, and everything works fine: + +```java +MetadataEncoderLFP encoder = new MetadataEncoderLFP(); +RSocket rsocket; +{// start client + rsocket = RSocketConnector.create().connect(TcpClientTransport.create("localhost", 7000)).block(); +} +encoder.addInterceptor( + writer -> writer.writeString(MimeTypes.create("password"), "thisIsACoolPassWord!")); +``` +As a bonus, the writers and readers can handle Multimap values, by encoding the content as a url query. (EX: "key=val1&key=val2&neat=wow") + +To illustrate this we can look at how tracing is handled, which requires a multimap of key value pairs to be stored in metadata. + +Here's how it's encoded: + +```java +private void appendTracing(MetadataWriter metadataWriter, SpanContext spanContext) { + if (spanContext == null) + return; + Iterable> items = spanContext.baggageItems(); + if (items == null) + return; + Map> paramMap = new LinkedHashMap<>(); + for (Entry ent : items) + paramMap.computeIfAbsent(ent.getKey(), nil -> new LinkedHashSet<>()).add(ent.getValue()); + metadataWriter.writeEntries(MimeTypes.MIME_TYPE_TRACER, paramMap); +} +``` +Here's how it's decoded: + +```java +private SpanContext getTracingSpanContext(MetadataReader metadataReader) { + if (tracer == null) + return null; + Map tracerMetadata = new LinkedHashMap<>(); + metadataReader.streamEntriesNonEmpty(MimeTypes.MIME_TYPE_TRACER) + .forEach(ent -> tracerMetadata.computeIfAbsent(ent.getKey(), nil -> ent.getValue())); + if (tracerMetadata.isEmpty()) + return null; + return Tracing.deserializeTracingMetadata(tracer, tracerMetadata); +} +``` + + # RSocket RPC - Java [![Build Status](https://travis-ci.org/rsocket/rsocket-rpc-java.svg?branch=master)](https://travis-ci.org/rsocket/rsocket-rpc-java) @@ -16,7 +100,7 @@ The standard [RSocket](http://rsocket.io) RPC Java implementation. 2. Run the following Gradle command to build the project: $ ./gradlew clean build - + ## What Next? * [Motivation](./docs/motivation.md) diff --git a/ci/travis.sh b/ci/travis.sh old mode 100755 new mode 100644 diff --git a/gradlew.bat b/gradlew.bat index a9f778a7..5093609d 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,104 +1,104 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto init - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:init -@rem Get command-line arguments, handling Windows variants - -if not "%OS%" == "Windows_NT" goto win9xME_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/jitpack.yml b/jitpack.yml new file mode 100644 index 00000000..ed646932 --- /dev/null +++ b/jitpack.yml @@ -0,0 +1,2 @@ +install: + - ./gradlew clean build publishToMavenLocal -x test -x verifyGoogleJavaFormat --settings-file settings.jitpack.gradle \ No newline at end of file diff --git a/jitpack.yml.bak b/jitpack.yml.bak new file mode 100644 index 00000000..0929ebf1 --- /dev/null +++ b/jitpack.yml.bak @@ -0,0 +1,2 @@ +install: + - .clean build publishToMavenLocal -x test -x verifyGoogleJavaFormat \ No newline at end of file diff --git a/publishToMavenLocal.ps1 b/publishToMavenLocal.ps1 new file mode 100644 index 00000000..708abdcf --- /dev/null +++ b/publishToMavenLocal.ps1 @@ -0,0 +1,4 @@ +$NOW=[Math]::Round((Get-Date).ToFileTime() / 10000000 - 11644473600) +echo "versionSuffix: -$NOW" +echo "args: $args" +./gradlew build publishToMavenLocal -x test -x javadoc -x verifyGoogleJavaFormat -c settings.jitpack.gradle -PversionSuffix="-$NOW" "$args" diff --git a/publishToMavenLocal.sh b/publishToMavenLocal.sh new file mode 100644 index 00000000..acd4d90a --- /dev/null +++ b/publishToMavenLocal.sh @@ -0,0 +1,3 @@ +NOW=$(date +%s) +echo "versionSuffix: -$NOW" +./gradlew build publishToMavenLocal -x test -x javadoc -x verifyGoogleJavaFormat -c settings.jitpack.gradle -PversionSuffix="-$NOW" "$@" diff --git a/rsocket-ipc-core/bin/.gitignore b/rsocket-ipc-core/bin/.gitignore new file mode 100644 index 00000000..4d76ae8a --- /dev/null +++ b/rsocket-ipc-core/bin/.gitignore @@ -0,0 +1,6 @@ +/main/ +/test/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/decoders/MetadataDecoderLFP.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/decoders/MetadataDecoderLFP.java new file mode 100644 index 00000000..d85b9872 --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/decoders/MetadataDecoderLFP.java @@ -0,0 +1,93 @@ +package io.rsocket.ipc.decoders; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.netty.buffer.ByteBuf; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.rsocket.ipc.MetadataDecoder; +import io.rsocket.ipc.encoders.MetadataReader; +import io.rsocket.ipc.mimetype.MimeTypes; +import io.rsocket.ipc.tracing.Tracing; +import io.rsocket.ipc.util.IPCUtils.DisposableAddList; +import reactor.core.Disposable; + +public class MetadataDecoderLFP implements MetadataDecoder { + + public static interface Interceptor extends Consumer { + }; + + private final Tracer tracer; + private final DisposableAddList interceptors = DisposableAddList.create(); + + public MetadataDecoderLFP(MetadataDecoderLFP.Interceptor... interceptors) { + this((Tracer) null, interceptors); + } + + public MetadataDecoderLFP(Tracer tracer, MetadataDecoderLFP.Interceptor... interceptors) { + this.tracer = tracer; + if (interceptors != null) + Arrays.asList(interceptors).stream().filter(Objects::nonNull).forEach(v -> this.addInterceptor(v)); + } + + public Disposable addInterceptor(MetadataDecoderLFP.Interceptor interceptor) { + Objects.requireNonNull(interceptor); + return interceptors.disposableAdd(interceptor); + } + + @Override + public Metadata decode(ByteBuf metadataByteBuf) throws Exception { + MetadataReader metadataReader = new MetadataReader(metadataByteBuf, false); + interceptors.forEach(v -> v.accept(metadataReader)); + String route = getRoute(metadataReader); + SpanContext spanContext = readTracingSpanContext(metadataReader); + return new Metadata() { + + @Override + public ByteBuf metadata() { + return metadataByteBuf; + } + + @Override + public String route() { + return route; + } + + @Override + public SpanContext spanContext() { + return spanContext; + } + + @Override + public boolean isComposite() { + return true; + } + }; + } + + private String getRoute(MetadataReader metadataReader) { + Stream stream = Stream.empty(); + stream = Stream.concat(stream, metadataReader.streamStrings(MimeTypes.MIME_TYPE_SERVICE)); + stream = Stream.concat(stream, metadataReader.streamStrings(MimeTypes.MIME_TYPE_METHOD)); + String route = stream.collect(Collectors.joining(".")); + return route; + } + + private SpanContext readTracingSpanContext(MetadataReader metadataReader) { + if (tracer == null) + return null; + Map tracerMetadata = new LinkedHashMap<>(); + metadataReader.streamEntriesNonEmpty(MimeTypes.MIME_TYPE_TRACER) + .forEach(ent -> tracerMetadata.computeIfAbsent(ent.getKey(), nil -> ent.getValue())); + if (tracerMetadata.isEmpty()) + return null; + return Tracing.deserializeTracingMetadata(tracer, tracerMetadata); + } + +} diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/decoders/MetadataWriter.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/decoders/MetadataWriter.java new file mode 100644 index 00000000..bb8e05da --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/decoders/MetadataWriter.java @@ -0,0 +1,132 @@ +package io.rsocket.ipc.decoders; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.PrimitiveIterator.OfInt; +import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.rsocket.ipc.mimetype.MimeType; +import io.rsocket.ipc.util.IPCUtils; +import io.rsocket.metadata.CompositeMetadataCodec; +import io.rsocket.metadata.WellKnownMimeType; + +public class MetadataWriter { + + private final ByteBufAllocator allocator; + private CompositeByteBuf _compositeByteBuf; + + public MetadataWriter() { + this(null, null); + } + + public MetadataWriter(ByteBufAllocator allocator, ByteBuf source) { + this.allocator = allocator != null ? allocator : ByteBufAllocator.DEFAULT; + if (source instanceof CompositeByteBuf) + this._compositeByteBuf = (CompositeByteBuf) source; + else if (source != null && source.readableBytes() != 0) { + throw new IllegalArgumentException("MetadataWriter requires a CompositeByteBuf or an empty source ByteBuf"); + } + } + + public CompositeByteBuf getCompositeByteBuf() { + if (_compositeByteBuf != null) + return _compositeByteBuf; + synchronized (this) { + if (_compositeByteBuf == null) + _compositeByteBuf = this.allocator.compositeBuffer(); + } + return _compositeByteBuf; + } + + public void writeString(MimeType mimeType, String... values) { + write(mimeType, values == null ? Stream.empty() : Arrays.asList(values).stream(), + s -> s.map(IPCUtils::byteBufFromString)); + } + + public void writeEntries(MimeType mimeType, String... keyValueEntries) { + Map> parameterMap; + if (keyValueEntries == null || keyValueEntries.length == 0) + parameterMap = Collections.emptyMap(); + else { + parameterMap = new LinkedHashMap<>(); + OfInt iter = IntStream.range(0, keyValueEntries.length).iterator(); + String key = null; + while (iter.hasNext()) { + int index = iter.nextInt(); + boolean even = index % 2 == 0; + boolean end = !iter.hasNext(); + String str = keyValueEntries[index]; + if (!even || end) { + String value = !even ? str : null; + parameterMap.computeIfAbsent(key, nil -> new ArrayList<>()).add(value); + } else + key = str; + + } + } + writeEntries(mimeType, parameterMap); + } + + public void writeEntries(MimeType mimeType, Map> parameterMap) { + if (parameterMap == null || parameterMap.isEmpty()) + return; + Stream>> streams = parameterMap.entrySet().stream().map(ent -> { + Iterable value = ent.getValue(); + if (value == null) + return Stream.empty(); + Stream> stream = IPCUtils.stream(value.iterator()) + .map(v -> new SimpleImmutableEntry<>(ent.getKey(), v)); + return stream; + }); + writeEntries(mimeType, IPCUtils.flatMap(streams)); + } + + public void writeEntries(MimeType mimeType, Stream> stream) { + if (stream == null) + return; + stream = stream.filter(Objects::nonNull); + stream = stream.filter(e -> IPCUtils.nonEmpty(e.getKey())); + stream = stream.map(e -> { + if (e.getValue() != null) + return e; + return new SimpleImmutableEntry<>(e.getKey(), ""); + }); + write(mimeType, stream, s -> { + String query = IPCUtils.encodeEntries(s); + return Stream.of(IPCUtils.byteBufFromString(query)); + }); + } + + public void write(MimeType mimeType, Stream valueStream, Function, Stream> encoder) { + Objects.requireNonNull(mimeType); + Objects.requireNonNull(valueStream); + Objects.requireNonNull(encoder); + Stream stream = encoder.apply(valueStream); + if (stream == null) + return; + Optional wellKnownMimeTypeOp = mimeType.getWellKnownMimeType(); + stream.forEach(bb -> { + if (wellKnownMimeTypeOp.isPresent()) + CompositeMetadataCodec.encodeAndAddMetadata(getCompositeByteBuf(), allocator, + wellKnownMimeTypeOp.get(), bb); + else + CompositeMetadataCodec.encodeAndAddMetadata(getCompositeByteBuf(), allocator, mimeType.getString(), + bb); + }); + + } + +} diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/encoders/MetadataEncoderLFP.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/encoders/MetadataEncoderLFP.java new file mode 100644 index 00000000..64122b00 --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/encoders/MetadataEncoderLFP.java @@ -0,0 +1,80 @@ +package io.rsocket.ipc.encoders; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.opentracing.SpanContext; +import io.rsocket.ipc.MetadataEncoder; +import io.rsocket.ipc.decoders.MetadataWriter; +import io.rsocket.ipc.mimetype.MimeTypes; +import io.rsocket.ipc.util.IPCUtils; +import io.rsocket.ipc.util.IPCUtils.DisposableAddList; +import reactor.core.Disposable; + +public class MetadataEncoderLFP implements MetadataEncoder { + + public static interface Interceptor extends Consumer { + }; + + private final ByteBufAllocator allocator; + private final DisposableAddList interceptors = DisposableAddList.create(); + + public MetadataEncoderLFP(MetadataEncoderLFP.Interceptor... interceptors) { + this(ByteBufAllocator.DEFAULT, interceptors); + } + + public MetadataEncoderLFP(ByteBufAllocator allocator, MetadataEncoderLFP.Interceptor... interceptors) { + this.allocator = Objects.requireNonNull(allocator); + if (interceptors != null) + Arrays.asList(interceptors).stream().filter(Objects::nonNull).forEach(v -> this.addInterceptor(v)); + } + + public Disposable addInterceptor(MetadataEncoderLFP.Interceptor interceptor) { + Objects.requireNonNull(interceptor); + return interceptors.disposableAdd(interceptor); + } + + @Override + public final ByteBuf encode(ByteBuf metadata, SpanContext spanContext, String service, String... parts) { + MetadataWriter metadataWriter = new MetadataWriter(this.allocator, metadata); + this.writeMetadata(metadataWriter, spanContext, service, parts); + return metadataWriter.getCompositeByteBuf(); + } + + protected void writeMetadata(MetadataWriter metadataWriter, SpanContext spanContext, String service, + String... parts) { + interceptors.forEach(interceptor -> interceptor.accept(metadataWriter)); + writeRoutingInfo(metadataWriter, service, parts); + writeTracingSpanContext(metadataWriter, spanContext); + } + + private void writeRoutingInfo(MetadataWriter metadataWriter, String service, String... parts) { + metadataWriter.writeString(MimeTypes.MIME_TYPE_SERVICE, service); + Stream methodsStream = parts == null ? Stream.empty() + : Arrays.asList(parts).stream().filter(IPCUtils::nonEmpty); + methodsStream.forEach(v -> { + metadataWriter.writeString(MimeTypes.MIME_TYPE_METHOD, v); + }); + } + + private void writeTracingSpanContext(MetadataWriter metadataWriter, SpanContext spanContext) { + if (spanContext == null) + return; + Iterable> items = spanContext.baggageItems(); + if (items == null) + return; + Map> paramMap = new LinkedHashMap<>(); + for (Entry ent : items) + paramMap.computeIfAbsent(ent.getKey(), nil -> new LinkedHashSet<>()).add(ent.getValue()); + metadataWriter.writeEntries(MimeTypes.MIME_TYPE_TRACER, paramMap); + } +} diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/encoders/MetadataReader.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/encoders/MetadataReader.java new file mode 100644 index 00000000..0ef54539 --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/encoders/MetadataReader.java @@ -0,0 +1,118 @@ +package io.rsocket.ipc.encoders; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import io.netty.buffer.ByteBuf; +import io.rsocket.ipc.mimetype.MimeType; +import io.rsocket.ipc.mimetype.MimeTypes; +import io.rsocket.ipc.util.IPCUtils; +import io.rsocket.metadata.CompositeMetadata; + +public class MetadataReader { + + private final CompositeMetadata compositeMetadata; + + public MetadataReader(ByteBuf source) { + this(source, false); + } + + public MetadataReader(ByteBuf source, boolean retainSlices) { + Objects.requireNonNull(source); + this.compositeMetadata = new CompositeMetadata(source, retainSlices); + } + + public boolean containsString(MimeType mimeType, String value) { + return containsString(mimeType, value, false, -1); + } + + public boolean containsStringSecure(MimeType mimeType, String value) { + Objects.requireNonNull(mimeType); + IPCUtils.requireNonEmpty(value); + return containsString(mimeType, value, false, 1); + } + + public boolean containsString(MimeType mimeType, String value, boolean ignoreCase, int maxCandidates) { + Stream stream = streamStrings(mimeType); + if (maxCandidates != -1)// ex: limit password candidates + stream = stream.limit(maxCandidates); + stream = stream.filter(v -> IPCUtils.equals(v, value, ignoreCase)); + return stream.findFirst().isPresent(); + } + + public boolean containsEntry(MimeType mimeType, String key, String value) { + return containsEntry(mimeType, key, value, false, -1); + } + + public boolean containsEntry(MimeType mimeType, String key, String value, boolean ignoreCase, int maxCandidates) { + Stream>> stream = streamEntries(mimeType) + .filter(e -> IPCUtils.equals(e.getKey(), key, ignoreCase)); + if (maxCandidates != -1)// ex: limit password candidates + stream = stream.limit(maxCandidates); + stream = stream.filter(e -> IPCUtils.equals(e.getValue().orElse(null), value, ignoreCase)); + return stream.findFirst().isPresent(); + } + + public Stream streamStrings(MimeType mimeType) { + return stream(toTest -> Objects.equals(toTest, mimeType), e -> { + return Stream.of(IPCUtils.byteBufToString(e.getContent())); + }); + } + + public Stream streamStringsNonEmpty(MimeType mimeType) { + return streamStrings(mimeType).filter(v -> !IPCUtils.isNullOrEmpty(v)); + } + + public Stream>> streamEntries(MimeType mimeType) { + return stream(toTest -> Objects.equals(toTest, mimeType), e -> { + return IPCUtils.decodeEntries(IPCUtils.byteBufToString(e.getContent())); + }); + } + + public Map>> getEntries(MimeType mimeType) { + Map>> map = new LinkedHashMap<>(); + streamEntries(mimeType) + .forEach(e -> map.computeIfAbsent(e.getKey(), nil -> new ArrayList<>()).add(e.getValue())); + return map; + } + + public Stream> streamEntriesNonEmpty(MimeType mimeType) { + return streamEntries(mimeType).filter(e -> e.getValue().isPresent()) + .map(e -> new SimpleImmutableEntry<>(e.getKey(), e.getValue().get())) + .filter(e -> !IPCUtils.isNullOrEmpty(e.getValue())).map(v -> v); + } + + public Map> getEntriesNonEmpty(MimeType mimeType) { + Map> map = new LinkedHashMap<>(); + streamEntriesNonEmpty(mimeType) + .forEach(e -> map.computeIfAbsent(e.getKey(), nil -> new ArrayList<>()).add(e.getValue())); + return map; + } + + public Stream stream(Predicate mimeTypePredicate, + Function> decoder) { + Objects.requireNonNull(mimeTypePredicate); + Objects.requireNonNull(decoder); + Stream> streams = this.getCompositeMetadata().stream().filter(e -> { + if (IPCUtils.isNullOrEmpty(e.getMimeType())) + return false; + MimeType mimteType = MimeTypes.create(e.getMimeType()); + return mimeTypePredicate.test(mimteType); + }).map(decoder); + return IPCUtils.flatMap(streams); + } + + public CompositeMetadata getCompositeMetadata() { + return compositeMetadata; + } + +} diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/mimetype/MimeType.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/mimetype/MimeType.java new file mode 100644 index 00000000..b3ae2070 --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/mimetype/MimeType.java @@ -0,0 +1,94 @@ +package io.rsocket.ipc.mimetype; + +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +import io.rsocket.ipc.util.IPCUtils; +import io.rsocket.metadata.WellKnownMimeType; + +public interface MimeType { + + String getString(); + + Optional getWellKnownMimeType(); + + static class Impl implements MimeType { + + private final String mimeTypeFallback; + private final Supplier wellKnownMimeTypeSupplier; + + public Impl(WellKnownMimeType wellKnownMimeType) { + Objects.requireNonNull(wellKnownMimeType); + this.wellKnownMimeTypeSupplier = () -> wellKnownMimeType; + this.mimeTypeFallback = null; + } + + public Impl(String mimeType) { + IPCUtils.requireNonEmpty(mimeType); + this.wellKnownMimeTypeSupplier = new Supplier() { + + private Optional parsed; + + @Override + public WellKnownMimeType get() { + if (parsed == null) + synchronized (this) { + if (parsed == null) + parsed = IPCUtils.parseWellKnownMimeType(mimeType); + } + return parsed.orElse(null); + } + }; + this.mimeTypeFallback = mimeType; + } + + @Override + public String getString() { + Optional wkmtOp = getWellKnownMimeType(); + if (wkmtOp.isPresent()) + return wkmtOp.get().getString(); + // shouldn't happen, but maybe some weird overriding + return IPCUtils.requireNonEmpty(mimeTypeFallback); + } + + @Override + public Optional getWellKnownMimeType() { + return Optional.ofNullable(wellKnownMimeTypeSupplier.get()); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + String strValue = getString().toLowerCase(); + result = prime * result + ((strValue == null) ? 0 : strValue.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof MimeType)) + return false; + String strValue = this.getString().toLowerCase(); + String strValueOther = ((MimeType) obj).getString(); + if (strValueOther != null) + strValueOther = strValueOther.toLowerCase(); + return Objects.equals(strValue, strValueOther); + } + + @Override + public String toString() { + WellKnownMimeType wellKnownMimeType = getWellKnownMimeType().orElse(null); + String result = "Impl [wellKnownMimeType=" + wellKnownMimeType + ", mimeTypeFallback=" + mimeTypeFallback + + "]"; + return result; + } + + } + +} diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/mimetype/MimeTypes.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/mimetype/MimeTypes.java new file mode 100644 index 00000000..4d526932 --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/mimetype/MimeTypes.java @@ -0,0 +1,19 @@ +package io.rsocket.ipc.mimetype; + +import io.rsocket.metadata.WellKnownMimeType; + +public class MimeTypes { + + public static final MimeType MIME_TYPE_SERVICE = MimeTypes.create(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING); + public static final MimeType MIME_TYPE_METHOD = MimeTypes.create(MIME_TYPE_SERVICE.getString() + "/method"); + public static final MimeType MIME_TYPE_TRACER = MimeTypes.create("message/x.rsocket.ipc.tracer.v0"); + + public static MimeType create(String mimeType) { + return new MimeType.Impl(mimeType); + } + + public static MimeType create(WellKnownMimeType wellKnownMimeType) { + return new MimeType.Impl(wellKnownMimeType); + } + +} diff --git a/rsocket-ipc-core/src/main/java/io/rsocket/ipc/util/IPCUtils.java b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/util/IPCUtils.java new file mode 100644 index 00000000..54841c2a --- /dev/null +++ b/rsocket-ipc-core/src/main/java/io/rsocket/ipc/util/IPCUtils.java @@ -0,0 +1,226 @@ +package io.rsocket.ipc.util; + +import com.lfp.joe.core.lot.AbstractLot; import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.Spliterators.AbstractLot; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import io.rsocket.metadata.WellKnownMimeType; +import reactor.core.Disposable; +import reactor.core.Disposables; + +public class IPCUtils { + public static final Charset CHARSET = StandardCharsets.UTF_8; + + public static X onError(Supplier supplier, Runnable callback) { + Objects.requireNonNull(supplier); + try { + return supplier.get(); + } catch (Throwable t) { + callback.run(); + throw java.lang.RuntimeException.class.isAssignableFrom(t.getClass()) + ? java.lang.RuntimeException.class.cast(t) + : new java.lang.RuntimeException(t); + } + } + + public static Stream stream(Iterator iterator) { + if (iterator == null) + return Stream.empty(); + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false); + } + + public static Stream flatMap(Stream> streams) { + if (streams == null) + return Stream.empty(); + Iterator> iter = streams.iterator(); + Spliterator spliterator = new AbstractLot(Long.MAX_VALUE, Spliterator.ORDERED) { + + private Iterator currentIterator; + + @Override + public boolean tryAdvance(Consumer action) { + synchronized (iter) { + while (true) { + if (currentIterator != null && currentIterator.hasNext()) { + action.accept(currentIterator.next()); + return true; + } + if (iter.hasNext()) { + Stream nextStream = iter.next(); + currentIterator = nextStream == null ? null : nextStream.iterator(); + } else + break; + } + } + return false; + } + + }; + return StreamSupport.stream(spliterator, false); + } + + public static String mimeTypeToString(WellKnownMimeType wellKnownMimeType) { + return wellKnownMimeType == null ? null : wellKnownMimeType.getString(); + } + + public static String byteBufToString(ByteBuf byteBuf) { + return Objects.requireNonNull(byteBuf).toString(CHARSET); + } + + public static ByteBuf byteBufFromString(String str) { + return Unpooled.wrappedBuffer(Objects.requireNonNull(str).getBytes(CHARSET)); + } + + public static boolean equals(String value1, String value2, boolean ignoreCase) { + if (Objects.equals(value1, value2)) + return true; + if (!ignoreCase || value1 == null || value2 == null) + return false; + return value1.equalsIgnoreCase(value2); + } + + public static String urlDecode(String str) { + try { + return URLDecoder.decode(str, IPCUtils.CHARSET.name()); + } catch (UnsupportedEncodingException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + } + + public static String urlEncode(String str) { + try { + return URLEncoder.encode(str, IPCUtils.CHARSET.name()); + } catch (UnsupportedEncodingException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + } + + public static boolean isNullOrEmpty(String str) { + return str == null || str.isEmpty(); + } + + public static boolean nonEmpty(String str) { + return !isNullOrEmpty(str); + } + + public static X requireNonEmpty(X charSequence) { + Objects.requireNonNull(charSequence); + if (charSequence.length() <= 0) + throw new IllegalArgumentException(); + return charSequence; + } + + private static final AtomicReference> WellKnownMimeType_FROM_STRING_CACHE_REF = new AtomicReference<>(); + + public static Optional parseWellKnownMimeType(String mimeType) { + if (IPCUtils.isNullOrEmpty(mimeType)) + return Optional.empty(); + if (WellKnownMimeType_FROM_STRING_CACHE_REF.get() == null) + synchronized (WellKnownMimeType_FROM_STRING_CACHE_REF) { + if (WellKnownMimeType_FROM_STRING_CACHE_REF.get() == null) { + Map map = new HashMap<>(); + for (WellKnownMimeType wkmt : WellKnownMimeType.values()) { + for (String str : Arrays.asList(wkmt.getString(), wkmt.name())) { + map.put(str, wkmt); + map.put(str.toUpperCase(), wkmt); + map.put(str.toLowerCase(), wkmt); + } + } + WellKnownMimeType_FROM_STRING_CACHE_REF.set(map); + } + } + WellKnownMimeType result = null; + if (result == null) + result = WellKnownMimeType_FROM_STRING_CACHE_REF.get().get(mimeType); + if (result == null) + result = WellKnownMimeType_FROM_STRING_CACHE_REF.get().get(mimeType.toUpperCase()); + if (result == null) + result = WellKnownMimeType_FROM_STRING_CACHE_REF.get().get(mimeType.toLowerCase()); + return Optional.ofNullable(result); + } + + public static String encodeEntries(Stream> entryStream) { + if (entryStream == null) + return ""; + entryStream = entryStream.filter(Objects::nonNull); + entryStream = entryStream.filter(e -> !IPCUtils.isNullOrEmpty(e.getKey())); + entryStream = entryStream.map(e -> { + String value = e.getValue(); + if (value != null) + return e; + return new SimpleImmutableEntry<>(e.getKey(), ""); + }); + StringBuilder sb = new StringBuilder(); + entryStream.forEach(ent -> { + if (sb.length() > 0) + sb.append("&"); + sb.append(String.format("%s=%s", IPCUtils.urlEncode(ent.getKey()), IPCUtils.urlEncode(ent.getValue()))); + }); + return sb.toString(); + } + + public static Stream>> decodeEntries(String value) { + if (IPCUtils.isNullOrEmpty(value)) + return Stream.empty(); + while (value.startsWith("?")) + value = value.substring(1); + if (IPCUtils.isNullOrEmpty(value)) + return Stream.empty(); + return Arrays.stream(value.split("&")).map(parameter -> { + List keyValue = Arrays.stream(parameter.split("=")).map(IPCUtils::urlDecode).limit(2) + .collect(Collectors.toList()); + Entry> result; + if (keyValue.size() == 2) + result = new SimpleImmutableEntry<>(keyValue.get(0), Optional.of(keyValue.get(1))); + else + result = new SimpleImmutableEntry<>(keyValue.get(0), Optional.empty()); + return result; + }); + } + + public static class DisposableAddList extends CopyOnWriteArrayList { + + private static final long serialVersionUID = 1L; + + public static DisposableAddList create() { + return new DisposableAddList(); + } + + public Disposable disposableAdd(X value) { + Disposable disposable = Disposables.composite(() -> { + this.removeIf(v -> v == value); + }); + this.add(value); + return disposable; + } + } + +} diff --git a/rsocket-ipc-core/src/test/java/io/rsocket/ipc/MultiServiceTest.java b/rsocket-ipc-core/src/test/java/io/rsocket/ipc/MultiServiceTest.java new file mode 100644 index 00000000..a09a09d7 --- /dev/null +++ b/rsocket-ipc-core/src/test/java/io/rsocket/ipc/MultiServiceTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.ipc; + +import io.netty.buffer.ByteBufAllocator; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.ipc.decoders.CompositeMetadataDecoder; +import io.rsocket.ipc.decoders.MetadataDecoderLFP; +import io.rsocket.ipc.encoders.DefaultMetadataEncoder; +import io.rsocket.ipc.encoders.MetadataEncoderLFP; +import io.rsocket.ipc.encoders.MetadataReader; +import io.rsocket.ipc.marshallers.Primitives; +import io.rsocket.ipc.marshallers.Strings; +import io.rsocket.ipc.mimetype.MimeTypes; +import io.rsocket.transport.local.LocalClientTransport; +import io.rsocket.transport.local.LocalServerTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.stream.Stream; + +import org.junit.Assert; +import org.junit.Test; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class MultiServiceTest { + @Test + public void test() { + MetadataDecoderLFP decoder = new MetadataDecoderLFP(); + MetadataEncoderLFP encoder = new MetadataEncoderLFP(); + RequestHandlingRSocket requestHandler = new RequestHandlingRSocket(decoder); + {// start server + SocketAcceptor socketAcceptor = (setup, client) -> Mono.just(requestHandler); + RSocketServer.create(socketAcceptor).interceptors(ir -> { + }).bind(TcpServerTransport.create("localhost", 7000)).block(); + } + RSocket rsocket; + {// start client + rsocket = RSocketConnector.create().connect(TcpClientTransport.create("localhost", 7000)).block(); + } + AtomicBoolean ff = new AtomicBoolean(); + + IPCRSocket service = Server.service("HelloService").noMeterRegistry().noTracer().marshall(Strings.marshaller()) + .unmarshall(Strings.unmarshaller()).requestResponse("hello", (s, byteBuf) -> { + MetadataReader reader = new MetadataReader(byteBuf, false); + Stream vals = reader.stream(v -> true, e -> Arrays.asList( + String.format("%s - %s", e.getMimeType(), e.getContent().toString(StandardCharsets.UTF_8))) + .stream()); + vals.forEach(v -> { + System.out.println(v); + }); + return Mono.just("Hello -> " + s); + }).requestResponse("goodbye", (s, byteBuf) -> Mono.just("Goodbye -> " + s)) + .requestResponse("count", Primitives.intMarshaller(), + (charSequence, byteBuf) -> Mono.just(charSequence.length())) + .requestResponse("increment", Primitives.intUnmarshaller(), Primitives.intMarshaller(), + (integer, byteBuf) -> Mono.just(integer + 1)) + .requestStream("helloStream", (s, byteBuf) -> Flux.range(1, 10).map(i -> i + " - Hello -> " + s)) + .requestStream("toString", Primitives.longUnmarshaller(), + (aLong, byteBuf) -> Flux.just(String.valueOf(aLong))) + .fireAndForget("ff", (s, byteBuf) -> { + ff.set(true); + return Mono.empty(); + }).requestChannel("helloChannel", (s, publisher, byteBuf) -> Flux.just("Hello -> " + s)).toIPCRSocket(); + + requestHandler.withEndpoint(service); + + Client helloService = Client.service("HelloService").rsocket(rsocket) + .customMetadataEncoder(encoder).noMeterRegistry().noTracer().marshall(Strings.marshaller()) + .unmarshall(Strings.unmarshaller()); + + String r1 = helloService.requestResponse("hello").apply("Alice").block(); + Assert.assertEquals("Hello -> Alice", r1); + + String r2 = helloService.requestResponse("goodbye").apply("Bob").block(); + Assert.assertEquals("Goodbye -> Bob", r2); + + StepVerifier.create(helloService.requestStream("helloStream").apply("Carol")).expectNextCount(10) + .expectComplete().verify(); + + helloService.fireAndForget("ff").apply("boom").block(); + int maxSeconds = 10; + for (int i = 0; i < maxSeconds && !ff.get(); i++) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + } + Assert.assertTrue(ff.get()); + + String r3 = helloService.requestChannel("helloChannel").apply(Mono.just("Eve")).blockLast(); + Assert.assertEquals("Hello -> Eve", r3); + + int count = helloService.requestResponse("count", Primitives.intUnmarshaller()).apply("hello").block(); + Assert.assertEquals(5, count); + + long l = System.currentTimeMillis(); + String toString = helloService.requestStream("toString", Primitives.longMarshaller()).apply(l).blockLast(); + Assert.assertEquals(String.valueOf(l), toString); + + Integer increment = helloService + .requestResponse("increment", Primitives.intMarshaller(), Primitives.intUnmarshaller()).apply(1) + .block(); + Assert.assertEquals(2, increment.intValue()); + Disposable encoderPasswordDisposable = null; + for (int i = 0; i < 3; i++) { + if (i == 1) { + encoderPasswordDisposable = encoder.addInterceptor(writer -> { + // writer.writeString(MimeTypes.create("password"), "AAAAAAAAAAAA"); + writer.writeString(MimeTypes.create("password"), "thisIsACoolPassWord!"); + }); + decoder.addInterceptor(reader -> { + boolean match = reader.containsStringSecure(MimeTypes.create("password"), "thisIsACoolPassWord!"); + if (!match) + throw new IllegalArgumentException("not authorized"); + }); + } + boolean shouldFail; + if (i == 2) { + shouldFail = true; + encoderPasswordDisposable.dispose(); + } else + shouldFail = false; + boolean failed; + try { + Integer incrementMetadata = helloService + .requestResponse("increment", Primitives.intMarshaller(), Primitives.intUnmarshaller()).apply(1) + .block(); + Assert.assertEquals(2, incrementMetadata.intValue()); + failed = false; + } catch (Exception e) { + failed = true; + } + Assert.assertEquals(failed, shouldFail); + } + } + + public static void main(String[] args) { + new MultiServiceTest().test(); + } +} diff --git a/rsocket-ipc-graphql/bin/.gitignore b/rsocket-ipc-graphql/bin/.gitignore new file mode 100644 index 00000000..4d76ae8a --- /dev/null +++ b/rsocket-ipc-graphql/bin/.gitignore @@ -0,0 +1,6 @@ +/main/ +/test/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-graphql/bin/test/schema.graphqls b/rsocket-ipc-graphql/bin/test/schema.graphqls new file mode 100644 index 00000000..8875bafe --- /dev/null +++ b/rsocket-ipc-graphql/bin/test/schema.graphqls @@ -0,0 +1,16 @@ +type Query { + bookById(id: ID): Book +} + +type Book { + id: ID + name: String + pageCount: Int + author: Author +} + +type Author { + id: ID + firstName: String + lastName: String +} \ No newline at end of file diff --git a/rsocket-ipc-gson/bin/.gitignore b/rsocket-ipc-gson/bin/.gitignore new file mode 100644 index 00000000..5f6586cc --- /dev/null +++ b/rsocket-ipc-gson/bin/.gitignore @@ -0,0 +1,5 @@ +/main/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-gson/build.gradle b/rsocket-ipc-gson/build.gradle new file mode 100644 index 00000000..c20600db --- /dev/null +++ b/rsocket-ipc-gson/build.gradle @@ -0,0 +1,24 @@ +description = 'RSocket IPC Gson Support' + +dependencies { + api project (':rsocket-ipc-core') + + compile 'com.google.code.gson:gson:2.8.6' + + testImplementation 'io.opentracing.brave:brave-opentracing' + testImplementation 'junit:junit' + testImplementation 'org.junit.jupiter:junit-jupiter-engine' + testImplementation 'org.junit.vintage:junit-vintage-engine' + + testImplementation 'javax.inject:javax.inject' + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'com.google.protobuf:protobuf-java' + testImplementation 'org.hdrhistogram:HdrHistogram' + testImplementation 'org.apache.logging.log4j:log4j-api' + testImplementation 'org.apache.logging.log4j:log4j-core' + testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl' + testImplementation 'io.rsocket:rsocket-transport-netty' + testImplementation 'io.rsocket:rsocket-transport-local' + testImplementation 'org.mockito:mockito-core' + testImplementation 'io.zipkin.reporter2:zipkin-sender-okhttp3' +} \ No newline at end of file diff --git a/rsocket-ipc-gson/src/main/java/io/rsocket/ipc/marshallers/GsonMarshaller.java b/rsocket-ipc-gson/src/main/java/io/rsocket/ipc/marshallers/GsonMarshaller.java new file mode 100644 index 00000000..c995b311 --- /dev/null +++ b/rsocket-ipc-gson/src/main/java/io/rsocket/ipc/marshallers/GsonMarshaller.java @@ -0,0 +1,51 @@ +package io.rsocket.ipc.marshallers; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Objects; + +import com.google.gson.Gson; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; +import io.rsocket.ipc.Marshaller; + +public class GsonMarshaller implements Marshaller { + + public static GsonMarshaller create(Gson gson) { + return create(gson, ByteBufAllocator.DEFAULT); + } + + public static GsonMarshaller create(Gson gson, ByteBufAllocator allocator) { + return new GsonMarshaller<>(gson, allocator); + } + + private Gson gson; + private ByteBufAllocator allocator; + + protected GsonMarshaller(Gson gson, ByteBufAllocator allocator) { + this.gson = Objects.requireNonNull(gson); + this.allocator = Objects.requireNonNull(allocator); + } + + @Override + public ByteBuf apply(X object) { + ByteBuf buffer = allocator.buffer(); + try (ByteBufOutputStream os = new ByteBufOutputStream(buffer); + OutputStreamWriter writer = new OutputStreamWriter(os);) { + gson.toJson(object, writer); + writer.flush(); + } catch (IOException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + return buffer; + } + + public Gson getGson() { + return gson; + } + +} diff --git a/rsocket-ipc-gson/src/main/java/io/rsocket/ipc/marshallers/GsonUnmarshaller.java b/rsocket-ipc-gson/src/main/java/io/rsocket/ipc/marshallers/GsonUnmarshaller.java new file mode 100644 index 00000000..903c77d8 --- /dev/null +++ b/rsocket-ipc-gson/src/main/java/io/rsocket/ipc/marshallers/GsonUnmarshaller.java @@ -0,0 +1,77 @@ +package io.rsocket.ipc.marshallers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.Objects; +import java.util.function.Function; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.reflect.TypeToken; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.rsocket.ipc.Unmarshaller; + +public class GsonUnmarshaller implements Unmarshaller { + + public static GsonUnmarshaller create(Gson gson, TypeToken typeToken) { + return create(gson, typeToken == null ? null : typeToken.getType()); + } + + public static GsonUnmarshaller create(Gson gson, Class classType) { + return create(gson, (Type) classType); + } + + public static GsonUnmarshaller create(Gson gson, Type type) { + Objects.requireNonNull(gson); + Objects.requireNonNull(type); + return new GsonUnmarshaller<>(isSupplier -> parseUnchecked(gson, type, isSupplier)); + } + + public static GsonUnmarshaller create(Gson gson, Type[] types) { + Objects.requireNonNull(gson); + Objects.requireNonNull(types); + for (Type type : types) + Objects.requireNonNull(type); + Function parser = bb -> { + JsonArray jarr = parseUnchecked(gson, JsonArray.class, bb); + Object[] result = new Object[jarr.size()]; + for (int i = 0; i < jarr.size(); i++) { + Type type = types == null || types.length <= i ? null : types[i]; + if (type == null) + type = Object.class; + result[i] = gson.fromJson(jarr.get(i), type); + } + return result; + }; + return new GsonUnmarshaller<>(parser); + } + + private final Function parser; + + protected GsonUnmarshaller(Function parser) { + this.parser = Objects.requireNonNull(parser); + } + + @Override + public X apply(ByteBuf byteBuf) { + return parser.apply(byteBuf); + } + + private static Y parseUnchecked(Gson gson, Type type, ByteBuf byteBuf) { + Objects.requireNonNull(gson); + Objects.requireNonNull(byteBuf); + type = type != null ? type : Object.class; + try (InputStream is = new ByteBufInputStream(byteBuf); InputStreamReader reader = new InputStreamReader(is);) { + return gson.fromJson(reader, type); + } catch (IOException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + } + +} diff --git a/rsocket-ipc-jackson/bin/.gitignore b/rsocket-ipc-jackson/bin/.gitignore new file mode 100644 index 00000000..5f6586cc --- /dev/null +++ b/rsocket-ipc-jackson/bin/.gitignore @@ -0,0 +1,5 @@ +/main/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-protobuf/bin/.gitignore b/rsocket-ipc-protobuf/bin/.gitignore new file mode 100644 index 00000000..5f6586cc --- /dev/null +++ b/rsocket-ipc-protobuf/bin/.gitignore @@ -0,0 +1,5 @@ +/main/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-reflection-client/bin/.gitignore b/rsocket-ipc-reflection-client/bin/.gitignore new file mode 100644 index 00000000..36f4c3cc --- /dev/null +++ b/rsocket-ipc-reflection-client/bin/.gitignore @@ -0,0 +1,6 @@ +/main/ +/default/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-reflection-client/build.gradle b/rsocket-ipc-reflection-client/build.gradle new file mode 100644 index 00000000..fa39da66 --- /dev/null +++ b/rsocket-ipc-reflection-client/build.gradle @@ -0,0 +1,24 @@ +description = 'RSocket IPC Reflection Mapping Client Support' + +dependencies { + api project (':rsocket-ipc-core') + implementation project (':rsocket-ipc-reflection-core') + implementation 'org.javassist:javassist:3.27.0-GA' + + testImplementation 'io.opentracing.brave:brave-opentracing' + testImplementation 'junit:junit' + testImplementation 'org.junit.jupiter:junit-jupiter-engine' + testImplementation 'org.junit.vintage:junit-vintage-engine' + + testImplementation 'javax.inject:javax.inject' + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'com.google.protobuf:prorsocket-ipc-reflection-clienttobuf-java' + testImplementation 'org.hdrhistogram:HdrHistogram' + testImplementation 'org.apache.logging.log4j:log4j-api' + testImplementation 'org.apache.logging.log4j:log4j-core' + testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl' + testImplementation 'io.rsocket:rsocket-transport-netty' + testImplementation 'io.rsocket:rsocket-transport-local' + testImplementation 'org.mockito:mockito-core' + testImplementation 'io.zipkin.reporter2:zipkin-sender-okhttp3' +} \ No newline at end of file diff --git a/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/IPCInvoker.java b/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/IPCInvoker.java new file mode 100644 index 00000000..82848bfe --- /dev/null +++ b/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/IPCInvoker.java @@ -0,0 +1,6 @@ +package io.rsocket.ipc.reflection.client; + +public interface IPCInvoker { + + public Object invoke(Object[] args) throws Throwable; +} diff --git a/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/LazyRSocket.java b/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/LazyRSocket.java new file mode 100644 index 00000000..182fb229 --- /dev/null +++ b/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/LazyRSocket.java @@ -0,0 +1,73 @@ +package io.rsocket.ipc.reflection.client; + +import java.util.Objects; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public abstract class LazyRSocket implements RSocket { + + public static LazyRSocket create(Supplier rsocketSupplier) { + Objects.requireNonNull(rsocketSupplier); + return new LazyRSocket() { + + @Override + protected RSocket getRSocket() { + return rsocketSupplier.get(); + } + }; + } + + protected abstract RSocket getRSocket(); + + @Override + public Mono fireAndForget(Payload payload) { + return getRSocket().fireAndForget(payload); + } + + @Override + public Mono requestResponse(Payload payload) { + return getRSocket().requestResponse(payload); + } + + @Override + public Flux requestStream(Payload payload) { + return getRSocket().requestStream(payload); + } + + @Override + public Flux requestChannel(Publisher payloads) { + return getRSocket().requestChannel(payloads); + } + + @Override + public Mono metadataPush(Payload payload) { + return getRSocket().metadataPush(payload); + } + + @Override + public double availability() { + return getRSocket().availability(); + } + + @Override + public Mono onClose() { + return getRSocket().onClose(); + } + + @Override + public void dispose() { + getRSocket().dispose(); + } + + @Override + public boolean isDisposed() { + return getRSocket().isDisposed(); + } + +} diff --git a/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/RSocketIPCClients.java b/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/RSocketIPCClients.java new file mode 100644 index 00000000..1edddb26 --- /dev/null +++ b/rsocket-ipc-reflection-client/src/main/java/io/rsocket/ipc/reflection/client/RSocketIPCClients.java @@ -0,0 +1,222 @@ +package io.rsocket.ipc.reflection.client; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import org.reactivestreams.Publisher; +import org.reflections8.ReflectionUtils; + +import io.netty.buffer.ByteBuf; +import io.rsocket.RSocket; +import io.rsocket.ipc.Client; +import io.rsocket.ipc.Client.U; +import io.rsocket.ipc.Marshaller; +import io.rsocket.ipc.MetadataEncoder; +import io.rsocket.ipc.Unmarshaller; +import io.rsocket.ipc.marshallers.Bytes; +import io.rsocket.ipc.reflection.core.MethodMapUtils; +import io.rsocket.ipc.reflection.core.PublisherConverter; +import io.rsocket.ipc.reflection.core.PublisherConverters; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class RSocketIPCClients { + private static final Class THIS_CLASS = new Object() { + }.getClass().getEnclosingClass(); + private static final Logger logger = java.util.logging.Logger.getLogger(THIS_CLASS.getName()); + protected static final Duration RSOCKET_SUPPLIER_WARN_DURATION = Duration.ofSeconds(10); + + public static X create(Supplier rSocketSupplier, Class serviceType, MetadataEncoder metadataEncoder, + Marshaller argumentMarshaller, BiFunction returnDeserializer) { + try { + return createInternal(rSocketSupplier, serviceType, metadataEncoder, argumentMarshaller, + returnDeserializer); + } catch (NoSuchMethodException | IllegalArgumentException | InstantiationException | IllegalAccessException + | InvocationTargetException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + protected static X createInternal(Supplier rSocketSupplier, Class serviceType, + MetadataEncoder metadataEncoder, Marshaller argumentMarshaller, + BiFunction returnDeserializer) throws NoSuchMethodException, + IllegalArgumentException, InstantiationException, IllegalAccessException, InvocationTargetException { + Objects.requireNonNull(rSocketSupplier); + Objects.requireNonNull(serviceType); + Objects.requireNonNull(argumentMarshaller); + Objects.requireNonNull(returnDeserializer); + Map mappedMethods = MethodMapUtils.getMappedMethods(serviceType, false); + Map> ipcInvokerCache = new ConcurrentHashMap<>(); + return (X) proxyFactory(serviceType).create(new Class[] {}, new Object[] {}, new MethodHandler() { + + @Override + public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable { + if (thisMethod.isDefault() && proceed != null) + return proceed.invoke(self, args); + Optional ipcInvokerOp = ipcInvokerCache.computeIfAbsent(thisMethod, nil -> { + Entry entry = mappedMethods.entrySet().stream() + .filter(ent -> MethodMapUtils.compatibleMethods(thisMethod, ent.getValue())).findFirst() + .orElse(null); + if (entry == null) + return Optional.empty(); + U clientBuilder = Client.service(serviceType.getName()) + .rsocket(LazyRSocket.create(rSocketSupplier)).customMetadataEncoder(metadataEncoder) + .noMeterRegistry().noTracer().marshall(argumentMarshaller); + String route = entry.getKey(); + return Optional.of( + createIPCInvoker(metadataEncoder, returnDeserializer, thisMethod, route, clientBuilder)); + }); + if (!ipcInvokerOp.isPresent()) + throw new NoSuchMethodException(String.format( + "could not map method in service. serviceType:%s method:%s", serviceType, thisMethod)); + return ipcInvokerOp.get().invoke(args); + } + + }); + } + + @SuppressWarnings("unchecked") + private static IPCInvoker createIPCInvoker(MetadataEncoder metadataEncoder, + BiFunction returnDeserializer, Method method, String route, + U clientBuilder) { + Optional> returnPublisherConverterOp = PublisherConverters.lookup(method.getReturnType()); + Type unmarshallerType; + if (returnPublisherConverterOp.isPresent()) + unmarshallerType = returnPublisherConverterOp.get().getPublisherTypeArgument(method.getGenericReturnType()) + .orElse(Object.class); + else + unmarshallerType = method.getGenericReturnType(); + if (MethodMapUtils.getRequestChannelParameterType(method).isPresent()) { + return (args) -> { + Publisher objPublisher = (Publisher) args[0]; + Flux argArrayPublisher = Flux.from(objPublisher).map(v -> new Object[] { v }); + Flux responsePublisher = clientBuilder + .unmarshall(createUnmarshaller(unmarshallerType, returnDeserializer)).requestChannel(route) + .apply(argArrayPublisher); + return returnMap(method, returnPublisherConverterOp, responsePublisher); + }; + } + if (MethodMapUtils.isFireAndForget(method)) + return (args) -> { + Mono result = clientBuilder.unmarshall(Bytes.byteBufUnmarshaller()).fireAndForget(route) + .apply(args); + return returnMap(method, returnPublisherConverterOp, result); + }; + if (returnPublisherConverterOp.isPresent() && !Mono.class.isAssignableFrom(method.getReturnType())) + return (args) -> { + Flux responsePublisher = clientBuilder + .unmarshall(createUnmarshaller(unmarshallerType, returnDeserializer)).requestStream(route) + .apply(args); + return returnMap(method, returnPublisherConverterOp, responsePublisher); + }; + return (args) -> { + Mono responsePublisher = clientBuilder + .unmarshall(createUnmarshaller(unmarshallerType, returnDeserializer)).requestResponse(route) + .apply(args); + return returnMap(method, returnPublisherConverterOp, responsePublisher); + }; + } + + private static Object returnMap(Method method, Optional> returnPublisherConverterOp, + Publisher responsePublisher) { + if (!returnPublisherConverterOp.isPresent()) + return returnMapPublisher(method, responsePublisher); + Object result = returnPublisherConverterOp.get().fromPublisher(responsePublisher); + if (result instanceof Publisher) + return returnMapPublisher(method, (Publisher) result); + return result; + } + + private static Object returnMapPublisher(Method method, Publisher responsePublisher) { + if (Flux.class.isAssignableFrom(method.getReturnType())) + return Flux.from(responsePublisher); + if (Mono.class.isAssignableFrom(method.getReturnType())) + return Mono.from(responsePublisher); + if (responsePublisher instanceof Mono) + return ((Mono) responsePublisher).block(); + return responsePublisher; + } + + private static Object returnFromResponsePublisher(Method method, + BiFunction returnDeserializer, PublisherConverter returnPublisherConverter, + Publisher responsePublisher) { + Type typeArgument = returnPublisherConverter.getPublisherTypeArgument(method.getGenericReturnType()) + .orElse(Object.class); + Publisher resultPublisher; + if (Mono.class.isAssignableFrom(method.getReturnType())) + resultPublisher = Mono.from(responsePublisher).map(bb -> { + Object deserialized = returnDeserializer.apply(typeArgument, bb); + return deserialized; + }); + else + resultPublisher = Flux.from(responsePublisher).map(bb -> { + Object deserialized = returnDeserializer.apply(typeArgument, bb); + return deserialized; + }); + Object result = returnPublisherConverter.fromPublisher(resultPublisher); + return result; + } + + private static Unmarshaller createUnmarshaller(Type unmarshallerType, + BiFunction returnDeserializer) { + return bb -> returnDeserializer.apply(unmarshallerType, bb); + } + + @SuppressWarnings("unchecked") + private static ProxyFactory proxyFactory(Class classType, Class... additionalInterfaces) { + ProxyFactory factory = new ProxyFactory(); + List> classTypes = new ArrayList<>(); + if (classType.isInterface()) + classTypes.add(classType); + else + factory.setSuperclass(classType); + classTypes.addAll(ReflectionUtils.getAllSuperTypes(classType)); + ensureInterfaces(factory, classTypes, additionalInterfaces); + return factory; + } + + private static void ensureInterfaces(ProxyFactory factory, Iterable> classTypes, + Class... additionalInterfaces) { + if (factory == null) + return; + List> setInterfaces = new ArrayList<>(2); + {// current + Class[] arr = factory.getInterfaces(); + if (arr != null) + Arrays.asList(arr).forEach(v -> setInterfaces.add(v)); + } + {// iterable + if (classTypes != null) + classTypes.forEach(v -> setInterfaces.add(v)); + } + {// array + if (additionalInterfaces != null) + Arrays.asList(additionalInterfaces).forEach(v -> setInterfaces.add(v)); + } + Stream> ifaceStream = setInterfaces.stream(); + ifaceStream = ifaceStream.distinct().filter(Objects::nonNull).filter(Class::isInterface).filter(v -> { + return !javassist.util.proxy.ProxyObject.class.isAssignableFrom(v); + }); + factory.setInterfaces(ifaceStream.toArray(Class[]::new)); + } + +} diff --git a/rsocket-ipc-reflection-core/bin/.gitignore b/rsocket-ipc-reflection-core/bin/.gitignore new file mode 100644 index 00000000..36f4c3cc --- /dev/null +++ b/rsocket-ipc-reflection-core/bin/.gitignore @@ -0,0 +1,6 @@ +/main/ +/default/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-reflection-core/build.gradle b/rsocket-ipc-reflection-core/build.gradle new file mode 100644 index 00000000..55b9bf84 --- /dev/null +++ b/rsocket-ipc-reflection-core/build.gradle @@ -0,0 +1,24 @@ +description = 'RSocket IPC Reflection Mapping Support' + + +dependencies { + api project (':rsocket-ipc-core') + compile 'net.oneandone.reflections8:reflections8:0.11.7' + + testImplementation 'io.opentracing.brave:brave-opentracing' + testImplementation 'junit:junit' + testImplementation 'org.junit.jupiter:junit-jupiter-engine' + testImplementation 'org.junit.vintage:junit-vintage-engine' + + testImplementation 'javax.inject:javax.inject' + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'com.google.protobuf:protobuf-java' + testImplementation 'org.hdrhistogram:HdrHistogram' + testImplementation 'org.apache.logging.log4j:log4j-api' + testImplementation 'org.apache.logging.log4j:log4j-core' + testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl' + testImplementation 'io.rsocket:rsocket-transport-netty' + testImplementation 'io.rsocket:rsocket-transport-local' + testImplementation 'org.mockito:mockito-core' + testImplementation 'io.zipkin.reporter2:zipkin-sender-okhttp3' +} \ No newline at end of file diff --git a/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/MethodMapUtils.java b/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/MethodMapUtils.java new file mode 100644 index 00000000..a9219414 --- /dev/null +++ b/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/MethodMapUtils.java @@ -0,0 +1,217 @@ +package io.rsocket.ipc.reflection.core; + +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import org.reflections8.ReflectionUtils; + +import io.rsocket.ipc.util.IPCUtils; +import javassist.Modifier; +import reactor.core.publisher.Mono; + +public class MethodMapUtils { + private static final Class THIS_CLASS = new Object() { + }.getClass().getEnclosingClass(); + private static final Logger logger = java.util.logging.Logger.getLogger(THIS_CLASS.getName()); + + private static final Map, Class> PRIMITIVES_TO_WRAPPERS = getPrimitivesToWrappers(); + + private static Map, Class> getPrimitivesToWrappers() { + Map, Class> map = new HashMap<>(); + map.put(boolean.class, Boolean.class); + map.put(byte.class, Byte.class); + map.put(char.class, Character.class); + map.put(double.class, Double.class); + map.put(float.class, Float.class); + map.put(int.class, Integer.class); + map.put(long.class, Long.class); + map.put(short.class, Short.class); + map.put(void.class, Void.class); + return Collections.unmodifiableMap(map); + } + + private static final Map, Class> WRAPPERS_TO_PRIMITIVES = getWrappersToPrimitives(); + + private static Map, Class> getWrappersToPrimitives() { + Map, Class> map = new HashMap<>(); + PRIMITIVES_TO_WRAPPERS.entrySet().forEach(e -> map.put(e.getValue(), e.getKey())); + return Collections.unmodifiableMap(map); + } + + @SuppressWarnings("unchecked") + public static Map getMappedMethods(Class classType, boolean includeVariations) { + Objects.requireNonNull(classType); + Map> lookup = new LinkedHashMap<>(); + { + Set methods = ReflectionUtils.getAllMethods(classType, m -> Modifier.isPublic(m.getModifiers()), + m -> !Modifier.isStatic(m.getModifiers())); + methods.stream().flatMap( + m -> getMethodSignatures(m, includeVariations).stream().map(v -> new SimpleImmutableEntry<>(v, m))) + .forEach(ent -> { + Method candidateMethod = ent.getValue(); + List methodList = lookup.computeIfAbsent(ent.getKey(), nil -> new ArrayList<>()); + for (int i = 0; i < methodList.size(); i++) { + Method currentMethod = methodList.get(i); + if (isOverridingMethod(currentMethod, candidateMethod)) { + methodList.set(i, candidateMethod); + return; + } + if (isOverridingMethod(candidateMethod, currentMethod)) + return; + } + methodList.add(candidateMethod); + }); + } + Map result = new LinkedHashMap<>(); + for (Entry> ent : lookup.entrySet()) { + String key = ent.getKey(); + List methods = ent.getValue(); + if (methods.size() > 1) { + logger.warning(String.format("multiple methods match signature, skipping:%s", key)); + continue; + } + result.put(key, methods.get(0)); + } + return result; + } + + public static boolean compatibleMethods(Method method1, Method method2) { + return isOverridingMethod(method1, method2) || isOverridingMethod(method2, method1); + } + + private static boolean isOverridingMethod(Method parentCandidate, Method childCandidate) { + if (Objects.equals(parentCandidate, childCandidate)) + return true; + if (!parentCandidate.getName().equals(childCandidate.getName())) + return false; + if (parentCandidate.getParameterCount() != childCandidate.getParameterCount()) + return false; + if (!parentCandidate.getDeclaringClass().isAssignableFrom(childCandidate.getDeclaringClass())) + return false; + Function restrictionLevel = m -> { + int modifiers = m.getModifiers(); + if (Modifier.isPrivate(modifiers)) + return 0; + if (Modifier.isProtected(modifiers)) + return 2; + if (Modifier.isPublic(modifiers)) + return 3; + return 1;// package level + }; + if (restrictionLevel.apply(childCandidate) < restrictionLevel.apply(parentCandidate)) + return false; + Class[] parentPts = parentCandidate.getParameterTypes(); + Class[] childPts = childCandidate.getParameterTypes(); + for (int i = 0; i < parentPts.length; i++) { + Class parentPt = parentPts[i]; + Class childPt = childPts[i]; + if (!parentPt.isAssignableFrom(childPt)) + return false; + } + return true; + } + + private static Set getMethodSignatures(Method method, boolean includeVariations) { + Set methodSignatures = new LinkedHashSet<>(); + if (includeVariations) + methodSignatures.add(method.getName()); + Class[] parameterTypes = method.getParameterTypes(); + List options = includeVariations ? Arrays.asList(false, true) : Arrays.asList(false); + for (boolean lowercase : options) { + for (boolean simpleName : options) { + for (boolean disableParameterTypes : options) { + for (boolean unwrapPrimitives : options) { + Stream> stream = Stream.empty(); + stream = Stream.concat(stream, Stream.of(new SimpleImmutableEntry<>("n", method.getName()))); + if (!disableParameterTypes) { + for (Class pt : parameterTypes) { + if (unwrapPrimitives) { + Class primType = WRAPPERS_TO_PRIMITIVES.get(pt); + pt = primType != null ? primType : pt; + } + stream = Stream.concat(stream, Stream.of(new SimpleImmutableEntry<>("at", + simpleName ? pt.getSimpleName() : pt.getName()))); + } + } + if (lowercase) + stream = stream.map(e -> new SimpleImmutableEntry<>(e.getKey().toLowerCase(), + e.getValue().toLowerCase())); + methodSignatures.add(IPCUtils.encodeEntries(stream)); + } + } + } + } + return methodSignatures; + } + + public static Optional getRequestChannelParameterType(Method method) { + if (method == null) + return Optional.empty(); + if (method.getParameterCount() != 1) + return Optional.empty(); + Optional> payloadConverter = PublisherConverters.lookup(method.getParameterTypes()[0]); + if (!payloadConverter.isPresent()) + return Optional.empty(); + if (!PublisherConverters.lookup(method.getReturnType()).isPresent()) + return Optional.empty(); + Type type = method.getGenericParameterTypes()[0]; + Optional typeArgument = payloadConverter.get().getPublisherTypeArgument(type); + if (!typeArgument.isPresent()) + return Optional.empty(); + return Optional.of(typeArgument.get()); + } + + public static boolean isFireAndForget(Method method) { + if (method == null) + return false; + if (method.getReturnType().equals(Void.TYPE)) + return true; + if (!Mono.class.isAssignableFrom(method.getReturnType())) + return false; + Type typeArgument = getPublisherTypeArgument(Mono.class, method.getGenericReturnType()).orElse(null); + if (typeArgument == null) + return false; + if (typeArgument.equals(Void.TYPE)) + return true; + if (typeArgument.equals(Void.class)) + return true; + if (typeArgument.equals(void.class)) + return true; + return false; + } + + public static Optional getPublisherTypeArgument(Class superClassLimit, Type type) { + if (type == null) + return Optional.empty(); + if (type instanceof ParameterizedType) { + ParameterizedType pt = (ParameterizedType) type; + Type[] actualTypeArguments = pt.getActualTypeArguments(); + if (actualTypeArguments != null && actualTypeArguments.length == 1) + return Optional.of(actualTypeArguments[0]); + } + if (!(type instanceof Class)) + return Optional.empty(); + Class superClass = ((Class) type).getSuperclass(); + if (superClass != null && superClassLimit != null && !superClassLimit.isAssignableFrom(superClass)) + return Optional.empty(); + return getPublisherTypeArgument(superClassLimit, superClass); + } + +} diff --git a/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/PublisherConverter.java b/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/PublisherConverter.java new file mode 100644 index 00000000..7de60c08 --- /dev/null +++ b/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/PublisherConverter.java @@ -0,0 +1,153 @@ +package io.rsocket.ipc.reflection.core; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.Iterator; +import java.util.Optional; +import java.util.stream.Stream; + +import org.reactivestreams.Publisher; + +import io.rsocket.ipc.util.IPCUtils; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public interface PublisherConverter { + + static PublisherConverter> direct() { + return new Abs>() { + + @Override + protected Publisher toPublisherInternal(Publisher input) { + return input; + } + + @Override + public Publisher fromPublisher(Publisher publisher) { + return publisher != null ? publisher : Flux.empty(); + } + + @Override + public Class> getConvertType() { + return (Class) Publisher.class; + } + + public int priority() { + return 0; + } + }; + } + + static PublisherConverter> stream() { + return new Abs>() { + + @Override + protected Publisher toPublisherInternal(Stream input) { + return Flux.fromStream(input).subscribeOn(Schedulers.elastic()); + } + + @Override + public Stream fromPublisher(Publisher publisher) { + if (publisher == null) + return Stream.empty(); + return Flux.from(publisher).toStream(); + } + + @Override + public Class> getConvertType() { + return (Class) Stream.class; + } + + public int priority() { + return 1; + } + }; + } + + static PublisherConverter> iterator() { + return new Abs>() { + + @Override + protected Publisher toPublisherInternal(Iterator input) { + return Flux.fromStream(IPCUtils.stream(input)).subscribeOn(Schedulers.elastic()); + } + + @Override + public Iterator fromPublisher(Publisher publisher) { + if (publisher == null) + return Collections.emptyIterator(); + return Flux.from(publisher).toStream().iterator(); + } + + @Override + public Class> getConvertType() { + return (Class) Iterator.class; + } + + public int priority() { + return 2; + } + }; + } + + static PublisherConverter> iterable() { + return new Abs>() { + + @Override + protected Publisher toPublisherInternal(Iterable input) { + return Flux.fromStream(IPCUtils.stream(input.iterator())).subscribeOn(Schedulers.elastic()); + } + + @Override + public Iterable fromPublisher(Publisher publisher) { + if (publisher == null) + return Collections.emptyList(); + Flux cachedFlux = Flux.from(publisher).cache(); + Iterable ible = () -> { + Iterator iter = (Iterator) cachedFlux.toStream().iterator(); + return iter; + }; + return ible; + } + + @Override + public Class> getConvertType() { + return (Class) Iterable.class; + } + + }; + } + + Publisher toPublisher(X input); + + X fromPublisher(Publisher publisher); + + Class getConvertType(); + + default int priority() { + return Integer.MAX_VALUE; + } + + default boolean appliesTo(Class classType) { + return getConvertType().isAssignableFrom(classType); + } + + default Optional getPublisherTypeArgument(Type type) { + return MethodMapUtils.getPublisherTypeArgument(this.getConvertType(), type); + } + + static abstract class Abs implements PublisherConverter { + + @Override + public Publisher toPublisher(X input) { + if (input == null) + return Flux.empty(); + return toPublisherInternal(input); + } + + protected abstract Publisher toPublisherInternal(X input); + + } +} diff --git a/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/PublisherConverters.java b/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/PublisherConverters.java new file mode 100644 index 00000000..7798d43d --- /dev/null +++ b/rsocket-ipc-reflection-core/src/main/java/io/rsocket/ipc/reflection/core/PublisherConverters.java @@ -0,0 +1,52 @@ +package io.rsocket.ipc.reflection.core; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.reflections8.ReflectionUtils; + +public class PublisherConverters { + + private static final Map, List>> DEFAULT_CONVERTER_METHODS = new ConcurrentHashMap<>( + 1); + + public static Optional> lookup(Class classType) { + if (classType == null) + return Optional.empty(); + Stream> publisherConverterStream = DEFAULT_CONVERTER_METHODS + .computeIfAbsent(Optional.empty(), nil -> loadDefaultConverters()).stream(); + Optional> op = publisherConverterStream.filter(v -> v.appliesTo(classType)).findFirst() + .map(v -> v); + return op; + } + + @SuppressWarnings("unchecked") + private static List> loadDefaultConverters() { + Set methods = ReflectionUtils.getAllMethods(PublisherConverter.class, + m -> Modifier.isStatic(m.getModifiers()), m -> Modifier.isPublic(m.getModifiers()), + m -> m.getParameterCount() == 0, m -> PublisherConverter.class.isAssignableFrom(m.getReturnType())); + Stream> stream = methods.stream().map(v -> { + try { + return v.invoke(null); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + }).map(v -> (PublisherConverter) v); + stream = stream.sorted(Comparator.comparing(v -> v.priority())); + return Collections.unmodifiableList(stream.collect(Collectors.toList())); + + } + +} diff --git a/rsocket-ipc-reflection-server/bin/.gitignore b/rsocket-ipc-reflection-server/bin/.gitignore new file mode 100644 index 00000000..252a2016 --- /dev/null +++ b/rsocket-ipc-reflection-server/bin/.gitignore @@ -0,0 +1,7 @@ +/main/ +/default/ +/test/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-reflection-server/build.gradle b/rsocket-ipc-reflection-server/build.gradle new file mode 100644 index 00000000..c3238981 --- /dev/null +++ b/rsocket-ipc-reflection-server/build.gradle @@ -0,0 +1,23 @@ +description = 'RSocket IPC Reflection Mapping Client Support' + +dependencies { + api project (':rsocket-ipc-core') + implementation project (":rsocket-ipc-reflection-core") + + testImplementation 'io.opentracing.brave:brave-opentracing' + testImplementation 'junit:junit' + testImplementation 'org.junit.jupiter:junit-jupiter-engine' + testImplementation 'org.junit.vintage:junit-vintage-engine' + + testImplementation 'javax.inject:javax.inject' + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'com.google.protobuf:protobuf-java' + testImplementation 'org.hdrhistogram:HdrHistogram' + testImplementation 'org.apache.logging.log4j:log4j-api' + testImplementation 'org.apache.logging.log4j:log4j-core' + testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl' + testImplementation 'io.rsocket:rsocket-transport-netty' + testImplementation 'io.rsocket:rsocket-transport-local' + testImplementation 'org.mockito:mockito-core' + testImplementation 'io.zipkin.reporter2:zipkin-sender-okhttp3' +} \ No newline at end of file diff --git a/rsocket-ipc-reflection-server/src/main/java/io/rsocket/ipc/reflection/server/RequestHandlingRSocketReflection.java b/rsocket-ipc-reflection-server/src/main/java/io/rsocket/ipc/reflection/server/RequestHandlingRSocketReflection.java new file mode 100644 index 00000000..904cb7a6 --- /dev/null +++ b/rsocket-ipc-reflection-server/src/main/java/io/rsocket/ipc/reflection/server/RequestHandlingRSocketReflection.java @@ -0,0 +1,250 @@ +package io.rsocket.ipc.reflection.server; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.reactivestreams.Publisher; + +import io.netty.buffer.ByteBuf; +import io.opentracing.Tracer; +import io.rsocket.ipc.IPCRSocket; +import io.rsocket.ipc.Marshaller; +import io.rsocket.ipc.MetadataDecoder; +import io.rsocket.ipc.RequestHandlingRSocket; +import io.rsocket.ipc.Server; +import io.rsocket.ipc.Server.U; +import io.rsocket.ipc.Unmarshaller; +import io.rsocket.ipc.reflection.core.MethodMapUtils; +import io.rsocket.ipc.reflection.core.PublisherConverter; +import io.rsocket.ipc.reflection.core.PublisherConverters; +import io.rsocket.ipc.util.IPCUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +public class RequestHandlingRSocketReflection extends RequestHandlingRSocket { + private static final Class THIS_CLASS = new Object() { + }.getClass().getEnclosingClass(); + private static final Logger logger = java.util.logging.Logger.getLogger(THIS_CLASS.getName()); + private Scheduler subscribeOnScheduler; + + public RequestHandlingRSocketReflection(Scheduler subscribeOnScheduler) { + super(); + this.subscribeOnScheduler = subscribeOnScheduler; + } + + public RequestHandlingRSocketReflection(Scheduler subscribeOnScheduler, MetadataDecoder decoder) { + super(decoder); + this.subscribeOnScheduler = subscribeOnScheduler; + } + + public RequestHandlingRSocketReflection(Scheduler subscribeOnScheduler, Tracer tracer) { + super(tracer); + this.subscribeOnScheduler = subscribeOnScheduler; + } + + public void register(Class serviceType, S service, Marshaller resultMarshaller, + Unmarshaller unmarshaller) { + register(serviceType, service, resultMarshaller, (paramTypes, bb) -> unmarshaller.apply(bb)); + } + + public void register(Class serviceType, S service, Marshaller resultMarshaller, + BiFunction argumentUnmarshaller) { + Objects.requireNonNull(serviceType); + Objects.requireNonNull(service); + Objects.requireNonNull(resultMarshaller); + Objects.requireNonNull(argumentUnmarshaller); + Map methodMapping = MethodMapUtils.getMappedMethods(serviceType, true); + Set serviceNameTracker = new HashSet<>(); + for (boolean lowercase : Arrays.asList(false, true)) { + for (boolean simpleName : Arrays.asList(false, true)) { + String serviceName = simpleName ? serviceType.getSimpleName() : serviceType.getName(); + serviceName = lowercase ? serviceName.toLowerCase() : serviceName; + if (!serviceNameTracker.add(serviceName)) + continue; + for (Entry ent : methodMapping.entrySet()) { + String route = ent.getKey(); + Method method = ent.getValue(); + logger.log(Level.INFO, + String.format("registering request handler. route:%s.%s", serviceName, route)); + register(service, argumentUnmarshaller, createServiceBuilder(serviceName, resultMarshaller), route, + method); + } + } + } + } + + private void register(S service, BiFunction argumentUnmarshaller, + U serviceBuilder, String route, Method method) { + if (registerRequestChannel(service, argumentUnmarshaller, serviceBuilder, route, method)) + return; + if (registerFireAndForget(service, argumentUnmarshaller, serviceBuilder, route, method)) + return; + Optional> returnPublisherConverter = PublisherConverters.lookup(method.getReturnType()); + if (registerRequestStream(service, argumentUnmarshaller, serviceBuilder, route, method, + returnPublisherConverter)) + return; + if (registerRequestResponse(service, argumentUnmarshaller, serviceBuilder, route, method, + returnPublisherConverter)) + return; + String errorMessage = String.format("unable to map method. serviceInstanceType:%s methodRoute:%s method:%s", + service.getClass().getName(), route, method); + throw new IllegalArgumentException(errorMessage); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private boolean registerRequestChannel(S service, BiFunction argumentUnmarshaller, + U serviceBuilder, String route, Method method) { + Optional requestChannelParameterType = MethodMapUtils.getRequestChannelParameterType(method); + if (!requestChannelParameterType.isPresent()) + return false; + PublisherConverter returnPublisherConverter = PublisherConverters.lookup(method.getReturnType()).get(); + Type[] typeArguments = new Type[] { requestChannelParameterType.get() }; + IPCRSocket ipcrSocket = serviceBuilder.unmarshall(createUnmarshaller(typeArguments, argumentUnmarshaller)) + .requestChannel(route, (first, publisher, md) -> { + Flux argFlux = Flux.from(publisher).map(args -> args[0]); + md.retain(); + Runnable mdRelease = () -> { + if (md.refCnt() > 0) + md.release(); + }; + return IPCUtils.onError(() -> { + return asFlux(() -> { + Object result = invoke(service, method, new Object[] { argFlux }); + return returnPublisherConverter.toPublisher(result); + }, this.subscribeOnScheduler).doOnTerminate(mdRelease); + }, mdRelease); + }).toIPCRSocket(); + this.withEndpoint(ipcrSocket); + return true; + } + + private boolean registerFireAndForget(S service, BiFunction argumentUnmarshaller, + U serviceBuilder, String route, Method method) { + if (!MethodMapUtils.isFireAndForget(method)) + return false; + IPCRSocket ipcrSocket = serviceBuilder + .unmarshall(createUnmarshaller(method.getGenericParameterTypes(), argumentUnmarshaller)) + .fireAndForget(route, (data, md) -> { + Mono> wrappedMono = asMono(() -> { + invoke(service, method, data); + return Mono.empty(); + }, this.subscribeOnScheduler); + return wrappedMono.flatMap(v -> v); + }).toIPCRSocket(); + this.withEndpoint(ipcrSocket); + return true; + + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private boolean registerRequestStream(S service, BiFunction argumentUnmarshaller, + U serviceBuilder, String route, Method method, + Optional> returnPublisherConverter) { + if (!returnPublisherConverter.isPresent()) + return false; + if (Mono.class.isAssignableFrom(method.getReturnType())) + return false; + PublisherConverter publisherConverter = returnPublisherConverter.get(); + IPCRSocket ipcrSocket = serviceBuilder + .unmarshall(createUnmarshaller(method.getGenericParameterTypes(), argumentUnmarshaller)) + .requestStream(route, (data, md) -> { + return asFlux(() -> { + Object result = invoke(service, method, data); + return publisherConverter.toPublisher(result); + }, this.subscribeOnScheduler); + }).toIPCRSocket(); + this.withEndpoint(ipcrSocket); + return true; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private boolean registerRequestResponse(S service, BiFunction argumentUnmarshaller, + U serviceBuilder, String route, Method method, + Optional> returnPublisherConverter) { + PublisherConverter publisherConverter = returnPublisherConverter.orElse(null); + IPCRSocket ipcrSocket = serviceBuilder + .unmarshall(createUnmarshaller(method.getGenericParameterTypes(), argumentUnmarshaller)) + .requestResponse(route, (data, md) -> { + Mono> wrappedMono = asMono(() -> { + Object result = invoke(service, method, data); + if (publisherConverter != null) + return Mono.from(publisherConverter.toPublisher(result)); + return Mono.just(result); + }, this.subscribeOnScheduler); + return wrappedMono.flatMap(v -> v); + }).toIPCRSocket(); + this.withEndpoint(ipcrSocket); + return true; + } + + public Scheduler getSubscribeOnScheduler() { + return subscribeOnScheduler; + } + + public void setSubscribeOnScheduler(Scheduler subscribeOnScheduler) { + this.subscribeOnScheduler = subscribeOnScheduler; + } + + private static U createServiceBuilder(String serviceName, Marshaller resultMarshaller) { + return Server.service(serviceName).noMeterRegistry().noTracer().marshall(resultMarshaller); + } + + private static Unmarshaller createUnmarshaller(Type[] typeArguments, + BiFunction argumentDeserializer) { + return bb -> argumentDeserializer.apply(typeArguments, bb); + } + + private static Flux asFlux(Supplier> supplier, Scheduler scheduler) { + Supplier> memoizedSupplier = memoized(supplier); + Flux result = Flux.defer(memoizedSupplier); + if (scheduler != null) + result = result.subscribeOn(scheduler); + return result; + } + + private static Mono asMono(Supplier supplier, Scheduler scheduler) { + Supplier memoizedSupplier = memoized(supplier); + Mono result = Mono.fromSupplier(memoizedSupplier); + if (scheduler != null) + result = result.subscribeOn(scheduler); + return result; + } + + private static Object invoke(S serivce, Method method, Object[] arguments) { + try { + return method.invoke(serivce, arguments); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw java.lang.RuntimeException.class.isAssignableFrom(e.getClass()) + ? java.lang.RuntimeException.class.cast(e) + : new java.lang.RuntimeException(e); + } + } + + private static Supplier memoized(Supplier supplier) { + Objects.requireNonNull(supplier); + AtomicReference> ref = new AtomicReference<>(); + return () -> { + if (ref.get() == null) + synchronized (ref) { + if (ref.get() == null) + ref.set(Optional.ofNullable(supplier.get())); + } + return ref.get().orElse(null); + }; + } + +} diff --git a/rsocket-ipc-reflection-test/bin/.gitignore b/rsocket-ipc-reflection-test/bin/.gitignore new file mode 100644 index 00000000..252a2016 --- /dev/null +++ b/rsocket-ipc-reflection-test/bin/.gitignore @@ -0,0 +1,7 @@ +/main/ +/default/ +/test/ +.flattened-pom.xml + + +.factorypath diff --git a/rsocket-ipc-reflection-test/build.gradle b/rsocket-ipc-reflection-test/build.gradle new file mode 100644 index 00000000..e7c1db37 --- /dev/null +++ b/rsocket-ipc-reflection-test/build.gradle @@ -0,0 +1,26 @@ +description = 'RSocket IPC Reflection Mapping Client Support' + +dependencies { + api project (':rsocket-ipc-core') + implementation project (':rsocket-ipc-reflection-client') + implementation project (':rsocket-ipc-reflection-server') + implementation project (':rsocket-ipc-gson') + implementation 'io.rsocket:rsocket-transport-netty' + + testImplementation 'io.opentracing.brave:brave-opentracing' + testImplementation 'junit:junit' + testImplementation 'org.junit.jupiter:junit-jupiter-engine' + testImplementation 'org.junit.vintage:junit-vintage-engine' + + testImplementation 'javax.inject:javax.inject' + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'com.google.protobuf:prorsocket-ipc-reflection-clienttobuf-java' + testImplementation 'org.hdrhistogram:HdrHistogram' + testImplementation 'org.apache.logging.log4j:log4j-api' + testImplementation 'org.apache.logging.log4j:log4j-core' + testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl' + testImplementation 'io.rsocket:rsocket-transport-netty' + testImplementation 'io.rsocket:rsocket-transport-local' + testImplementation 'org.mockito:mockito-core' + testImplementation 'io.zipkin.reporter2:zipkin-sender-okhttp3' +} \ No newline at end of file diff --git a/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/ClientTest.java b/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/ClientTest.java new file mode 100644 index 00000000..1dedd35a --- /dev/null +++ b/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/ClientTest.java @@ -0,0 +1,54 @@ +package io.rsocket.ipc.reflection; + +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Date; +import java.util.function.BiFunction; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.google.gson.Gson; + +import io.netty.buffer.ByteBuf; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; +import io.rsocket.ipc.encoders.MetadataEncoderLFP; +import io.rsocket.ipc.marshallers.GsonMarshaller; +import io.rsocket.ipc.marshallers.GsonUnmarshaller; +import io.rsocket.ipc.reflection.client.RSocketIPCClients; +import io.rsocket.transport.netty.client.TcpClientTransport; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class ClientTest { + + public static void main(String[] args) { + com.google.gson.Gson gson = new Gson(); + RSocket rsocket = RSocketConnector.create().connect(TcpClientTransport.create("localhost", 7000)).block(); + BiFunction deserializer = (t, bb) -> { + return GsonUnmarshaller.create(gson, t).apply(bb); + }; + TestServiceChannel client = RSocketIPCClients.create(() -> rsocket, TestServiceChannel.class, + new MetadataEncoderLFP(), v -> { + ByteBuf bb = GsonMarshaller.create(gson).apply(v); + System.out.println(bb.toString(StandardCharsets.UTF_8)); + return bb; + }, deserializer); + System.out.println(client.addMono(69, 420).block()); + System.out.println(client.add(69, 420)); + client.intFlux(IntStream.range(0, 20).toArray()).toStream().forEach(v -> { + System.out.println(v); + }); + Stream res = client.cool(Arrays.asList(new Date(), new Date(0l)), "sup"); + res.forEach(v -> { + System.out.println(v); + }); + + Flux resp = client.channel(Flux.range(0, 10).map(i -> new Date()).delayElements(Duration.ofSeconds(1))); + resp.toStream().forEach(v -> { + System.out.println(v); + }); + } +} diff --git a/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/ServerTest.java b/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/ServerTest.java new file mode 100644 index 00000000..96477656 --- /dev/null +++ b/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/ServerTest.java @@ -0,0 +1,44 @@ +package io.rsocket.ipc.reflection; + +import java.util.logging.Level; + +import com.google.gson.Gson; + +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.ipc.decoders.MetadataDecoderLFP; +import io.rsocket.ipc.marshallers.GsonMarshaller; +import io.rsocket.ipc.marshallers.GsonUnmarshaller; +import io.rsocket.ipc.reflection.server.RequestHandlingRSocketReflection; +import io.rsocket.transport.netty.server.TcpServerTransport; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ServerTest { + private static final Class THIS_CLASS = new Object() { + }.getClass().getEnclosingClass(); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(THIS_CLASS); + + public static void main(String[] args) throws InterruptedException { + Class classType = TestServiceChannel.class; + TestServiceChannel service = new TestServiceChannel.Impl(); + MetadataDecoderLFP decoder = new MetadataDecoderLFP(); + RequestHandlingRSocketReflection requestHandler; + { + requestHandler = new RequestHandlingRSocketReflection(Schedulers.elastic(), new MetadataDecoderLFP()); + SocketAcceptor socketAcceptor = (setup, client) -> Mono.just(requestHandler); + RSocketServer.create(socketAcceptor).interceptors(ir -> { + }).bind(TcpServerTransport.create("localhost", 7000)).block(); + } + boolean releaseOnParse = true; + Gson gson = new Gson(); + requestHandler.register(classType, service, GsonMarshaller.create(gson), (types, bb) -> { + return GsonUnmarshaller.create(gson, types).apply(bb); + }); + System.out.println("started"); + while (true) { + Thread.sleep(1000); + } + } + +} diff --git a/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/TestServiceChannel.java b/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/TestServiceChannel.java new file mode 100644 index 00000000..7e82e29e --- /dev/null +++ b/rsocket-ipc-reflection-test/src/main/java/io/rsocket/ipc/reflection/TestServiceChannel.java @@ -0,0 +1,68 @@ +package io.rsocket.ipc.reflection; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface TestServiceChannel { + + Flux channel(Flux flux); + + void msg(String msg); + + Stream stream(String msg, Date date); + + int add(int... args); + + Flux intFlux(int... args); + + Mono addMono(int... args); + + Stream cool(List vals, String msg); + + public static class Impl implements TestServiceChannel { + + public Flux channel(Flux flux) { + return flux.map(d -> "the date is:" + d); + } + + public void msg(String msg) { + System.out.println("msg:" + msg); + } + + public Stream stream(String msg, Date date) { + return IntStream.range(0, 10).mapToObj(v -> v).map(v -> v + "- " + msg + " - " + date); + } + + public int add(int... args) { + int res = 0; + for (int arg : args) + res += arg; + return res; + } + + public Flux intFlux(int... args) { + List argList = new ArrayList<>(); + for (int val : args) + argList.add(val); + return Flux.fromIterable(argList); + } + + public Mono addMono(int... args) { + int res = 0; + for (int arg : args) + res += arg; + return Mono.just(res); + } + + public Stream cool(List vals, String msg) { + return vals.stream().map(v -> msg + " " + v); + } + + } +} diff --git a/rsocket-rpc-core/bin/.gitignore b/rsocket-rpc-core/bin/.gitignore new file mode 100644 index 00000000..4d76ae8a --- /dev/null +++ b/rsocket-rpc-core/bin/.gitignore @@ -0,0 +1,6 @@ +/main/ +/test/ +.flattened-pom.xml + + +.factorypath diff --git a/settings.gradle b/settings.gradle index 7300d08f..7f607dfd 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,8 +12,11 @@ include 'rsocket-rpc-core' include 'rsocket-rpc-metrics-idl' include 'rsocket-rpc-protobuf' include 'rsocket-rpc-protobuf-idl' - - +include 'rsocket-ipc-gson' +include 'rsocket-ipc-reflection-core' +include 'rsocket-ipc-reflection-client' +include 'rsocket-ipc-reflection-server' +include 'rsocket-ipc-reflection-test' gradleEnterprise { buildScan { diff --git a/settings.jitpack.gradle b/settings.jitpack.gradle new file mode 100644 index 00000000..ae708431 --- /dev/null +++ b/settings.jitpack.gradle @@ -0,0 +1,28 @@ +plugins { + id 'com.gradle.enterprise' version '3.1' +} + +rootProject.name = 'rsocket-rpc-java' + +include 'rsocket-ipc-core' +include 'rsocket-ipc-graphql' +include 'rsocket-ipc-jackson' +/* +include 'rsocket-ipc-protobuf' +include 'rsocket-rpc-core' +include 'rsocket-rpc-metrics-idl' +include 'rsocket-rpc-protobuf' +include 'rsocket-rpc-protobuf-idl' +*/ +include 'rsocket-ipc-gson' +include 'rsocket-ipc-reflection-core' +include 'rsocket-ipc-reflection-client' +include 'rsocket-ipc-reflection-server' +include 'rsocket-ipc-reflection-test' + +gradleEnterprise { + buildScan { + termsOfServiceUrl = 'https://gradle.com/terms-of-service' + termsOfServiceAgree = 'yes' + } +} \ No newline at end of file