Skip to content

Commit 06ed48e

Browse files
David TuranskiGlenn Renfro
authored andcommitted
XD-1024 Added KryoMessageSerializer and configured in redis adapters
1 parent c2b5e60 commit 06ed48e

File tree

6 files changed

+222
-5
lines changed

6 files changed

+222
-5
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.x.redis;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.messaging.MessageHeaders;
23+
import org.springframework.messaging.support.MessageHeaderAccessor;
24+
25+
import com.esotericsoftware.kryo.Kryo;
26+
import com.esotericsoftware.kryo.Serializer;
27+
import com.esotericsoftware.kryo.io.Input;
28+
import com.esotericsoftware.kryo.io.Output;
29+
30+
31+
/**
32+
* Implementation of Kryo Serializer to handle {@link MessageHeaders} which is immutable.
33+
*
34+
* @author David Turanski
35+
* @since 1.0
36+
*/
37+
public class KryoMessageHeadersSerializer extends Serializer<MessageHeaders> {
38+
39+
@Override
40+
public void write(Kryo kryo, Output output, MessageHeaders headers) {
41+
MessageHeaderAccessor mha = new MessageHeaderAccessor();
42+
mha.copyHeaders(headers);
43+
kryo.writeObject(output, mha.toMap());
44+
}
45+
46+
@Override
47+
public MessageHeaders read(Kryo kryo, Input input, Class<MessageHeaders> type) {
48+
@SuppressWarnings("unchecked")
49+
Map<String, Object> map = kryo.readObject(input, HashMap.class);
50+
return new MessageHeaders(map);
51+
}
52+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.x.redis;
18+
19+
import org.springframework.data.redis.serializer.RedisSerializer;
20+
import org.springframework.data.redis.serializer.SerializationException;
21+
import org.springframework.messaging.Message;
22+
import org.springframework.messaging.MessageHeaders;
23+
import org.springframework.messaging.support.GenericMessage;
24+
25+
import com.esotericsoftware.kryo.Kryo;
26+
import com.esotericsoftware.kryo.io.Input;
27+
import com.esotericsoftware.kryo.io.Output;
28+
import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
29+
30+
31+
/**
32+
* A Redis Serializer that uses Kryo serialization
33+
*
34+
* @author David Turanski
35+
* @since 1.0
36+
*/
37+
public class KryoMessageSerializer implements RedisSerializer<Message<?>> {
38+
39+
@Override
40+
public byte[] serialize(Message<?> message) throws SerializationException {
41+
Kryo kryo = kryoInstance();
42+
Output output = new Output(2048, -1);
43+
kryo.writeObjectOrNull(output, message, GenericMessage.class);
44+
return output.getBuffer();
45+
}
46+
47+
@Override
48+
public Message<?> deserialize(byte[] bytes) throws SerializationException {
49+
if (bytes == null) {
50+
return null;
51+
}
52+
Kryo kryo = kryoInstance();
53+
Input input = new Input(bytes);
54+
return kryo.readObjectOrNull(input, GenericMessage.class);
55+
}
56+
57+
private Kryo kryoInstance() {
58+
Kryo kryo = new Kryo();
59+
kryo.register(MessageHeaders.class, new KryoMessageHeadersSerializer());
60+
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
61+
return kryo;
62+
}
63+
64+
}

spring-xd-dirt/src/main/resources/META-INF/spring-xd/transports/redis-admin.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
channel="deployChannel"
1414
connection-factory="redisConnectionFactory"
1515
queue="queue.deployer"
16-
extract-payload="false" />
16+
extract-payload="false"
17+
serializer="redisSerializer"
18+
/>
1719

1820
<int-redis:outbound-channel-adapter channel="undeployChannel" topic="topic.undeployer" connection-factory="redisConnectionFactory" />
1921

spring-xd-dirt/src/main/resources/META-INF/spring-xd/transports/redis-common.xml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
77
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
88

9-
<import resource="classpath:/META-INF/spring-xd/internal/xd-common.xml"/>
9+
<import resource="classpath:/META-INF/spring-xd/internal/xd-common.xml" />
1010

1111
<bean id="redisConnectionFactory"
1212
class="org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory">
@@ -18,10 +18,12 @@
1818
<bean id="humanFriendlyRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
1919
<property name="connectionFactory" ref="redisConnectionFactory" />
2020
</bean>
21-
21+
2222
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
2323
<property name="connectionFactory" ref="redisConnectionFactory" />
2424
</bean>
25-
25+
26+
<bean id="redisSerializer"
27+
class="org.springframework.integration.x.redis.KryoMessageSerializer" />
2628

2729
</beans>

spring-xd-dirt/src/main/resources/META-INF/spring-xd/transports/redis-container.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
channel="containerControlChannel"
2020
connection-factory="redisConnectionFactory"
2121
queue="queue.deployer"
22-
expect-message="true" />
22+
expect-message="true"
23+
serializer="redisSerializer"
24+
/>
2325

2426
<int-redis:inbound-channel-adapter topics="topic.undeployer" channel="containerControlChannel" />
2527

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.x.redis;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertNull;
22+
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import org.junit.Test;
27+
28+
import org.springframework.messaging.Message;
29+
import org.springframework.messaging.support.GenericMessage;
30+
31+
32+
/**
33+
*
34+
* @author David Turanski
35+
*/
36+
public class KryoMessageSerializerTests {
37+
38+
private KryoMessageSerializer serializer = new KryoMessageSerializer();
39+
40+
@Test
41+
public void testSerializeAndDeserialize() {
42+
Map<String, Object> headers = new HashMap<String, Object>();
43+
headers.put("header1", "foo");
44+
headers.put("header2", "bar");
45+
Foo foo = new Foo(123, "hello");
46+
headers.put("header3", foo);
47+
GenericMessage<String> message = new GenericMessage<String>("Hello", headers);
48+
byte[] rawMessage = serializer.serialize(message);
49+
@SuppressWarnings("unchecked")
50+
Message<String> convertedMsg = (Message<String>) serializer.deserialize(rawMessage);
51+
assertEquals("Hello", convertedMsg.getPayload());
52+
assertEquals("foo", convertedMsg.getHeaders().get("header1"));
53+
assertEquals("bar", convertedMsg.getHeaders().get("header2"));
54+
assertNotNull(convertedMsg.getHeaders().get("id"));
55+
assertNotNull(convertedMsg.getHeaders().get("timestamp"));
56+
assertEquals(foo, convertedMsg.getHeaders().get("header3"));
57+
}
58+
59+
@Test
60+
public void testDeserializeNull() {
61+
assertNull(serializer.deserialize(null));
62+
}
63+
64+
public static class Foo {
65+
66+
private final int i;
67+
68+
private final String s;
69+
70+
public Foo(int i, String s) {
71+
this.i = i;
72+
this.s = s;
73+
}
74+
75+
public int getI() {
76+
return i;
77+
}
78+
79+
public String getS() {
80+
return s;
81+
}
82+
83+
@Override
84+
public boolean equals(Object other) {
85+
if (!(other instanceof Foo)) {
86+
return false;
87+
}
88+
Foo foo = (Foo) other;
89+
90+
return i == foo.i && (s == null && foo.s == null || s.equals(foo.s));
91+
}
92+
93+
}
94+
95+
}

0 commit comments

Comments
 (0)