Skip to content

Commit 1c4817e

Browse files
Thomas Risbergmarkfisher
authored andcommitted
XD-282 Adding an avro sink
(similar to the hdfs sink) - adding parser and integration tests - enabling the HdfsOutboundChannelAdapterIntegrationTests using filesystem for Hadoop FS formatting xml for avro sink
1 parent 72baec9 commit 1c4817e

File tree

16 files changed

+524
-22
lines changed

16 files changed

+524
-22
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,8 @@ project('spring-xd-extension-hdfs') {
561561
project('spring-xd-hadoop') {
562562
description = 'Spring XD Hadoop'
563563
dependencies {
564-
compile "org.springframework:spring-aop:$springVersion"
564+
compile project(':spring-xd-extension-hdfs')
565+
compile "org.springframework:spring-aop:$springVersion"
565566
compile "org.springframework:spring-context:$springVersion"
566567
compile "org.springframework:spring-context-support:$springVersion"
567568
compile "org.springframework:spring-jdbc:$springVersion"

modules/sink/avro.xml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<beans xmlns="http://www.springframework.org/schema/beans"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:int="http://www.springframework.org/schema/integration"
5+
xmlns:int-hadoop="http://www.springframework.org/schema/integration/hadoop"
6+
xmlns:task="http://www.springframework.org/schema/task"
7+
xmlns:hdp="http://www.springframework.org/schema/hadoop"
8+
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
9+
http://www.springframework.org/schema/integration/hadoop http://www.springframework.org/schema/integration/hadoop/spring-integration-hadoop.xsd
10+
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
11+
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
12+
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
13+
14+
<int:channel id="input"/>
15+
16+
<int:aggregator
17+
input-channel="input"
18+
correlation-strategy-expression="payload.getClass().getName()"
19+
release-strategy-expression="size() == ${batchSize:1000}"
20+
send-partial-result-on-expiry="true"
21+
message-store="messageStore"
22+
output-channel="objects"/>
23+
24+
<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
25+
<property name="messageGroupStore" ref="messageStore"/>
26+
<property name="timeout" value="${timeout:10}000"/>
27+
<property name="expireOnDestroy" value="true"/>
28+
</bean>
29+
30+
<task:scheduled-tasks>
31+
<task:scheduled ref="reaper" method="run" fixed-rate="1000"/>
32+
</task:scheduled-tasks>
33+
34+
<bean id="messageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
35+
36+
<int-hadoop:avro-outbound-channel-adapter id="objects" dataset-operations="datasetOperations"/>
37+
38+
<bean id="datasetOperations" class="org.springframework.data.hadoop.store.dataset.DatasetTemplate">
39+
<property name="datasetRepositoryFactory" ref="datasetRepositoryFactory"/>
40+
</bean>
41+
42+
<bean id="datasetRepositoryFactory" class="org.springframework.data.hadoop.store.dataset.DatasetRepositoryFactory">
43+
<property name="conf" ref="hadoopConfiguration"/>
44+
<property name="basePath" value="${basePath:/xd/${xd.stream.name}}"/>
45+
</bean>
46+
47+
<hdp:configuration register-url-handler="false" properties-location="file:${XD_HOME}/config/hadoop.properties"/>
48+
49+
</beans>
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.xd.hadoop.fs;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
import org.springframework.data.hadoop.store.dataset.DatasetOperations;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.util.Assert;
24+
25+
import java.io.IOException;
26+
import java.util.Collection;
27+
import java.util.Collections;
28+
29+
/**
30+
*
31+
* @author Thomas Risberg
32+
*/
33+
public class AvroWriter implements HdfsWriter {
34+
35+
private final Log logger = LogFactory.getLog(this.getClass());
36+
37+
private DatasetOperations datasetOperations;
38+
39+
public AvroWriter(DatasetOperations datasetOperations) {
40+
Assert.notNull(datasetOperations, "DatasetTemplate must not be null.");
41+
logger.info("Configured with datasetOperations: " + datasetOperations);
42+
this.datasetOperations = datasetOperations;
43+
}
44+
45+
@Override
46+
public void write(Message<?> message) throws IOException {
47+
Object payload = message.getPayload();
48+
if (payload instanceof Collection<?>) {
49+
Collection<?> payloads = (Collection<?>) payload;
50+
if (logger.isDebugEnabled()) {
51+
logger.debug("Writing a collection of " + payloads.size() +
52+
" POJOs of type " + payloads.toArray()[0].getClass().getName());
53+
}
54+
datasetOperations.write((Collection<?>) message.getPayload());
55+
} else {
56+
logger.warn("Expected a collection of POJOs but received " + message.getPayload().getClass().getName());
57+
datasetOperations.write(Collections.singletonList(message.getPayload()));
58+
}
59+
}
60+
61+
@Override
62+
public void close() {
63+
// no-op
64+
}
65+
66+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.xd.hadoop.fs;
18+
19+
import org.springframework.data.hadoop.store.dataset.DatasetOperations;
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
*
24+
* @author Thomas Risberg
25+
*/
26+
public class AvroWriterFactory implements HdfsWriterFactory {
27+
28+
private DatasetOperations datasetOperations;
29+
30+
public AvroWriterFactory(DatasetOperations datasetOperations) {
31+
Assert.notNull(datasetOperations, "DatasetTemplate must not be null.");
32+
this.datasetOperations = datasetOperations;
33+
}
34+
35+
@Override
36+
public HdfsWriter createWriter() {
37+
AvroWriter writer = new AvroWriter(datasetOperations);
38+
return writer;
39+
}
40+
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.xd.integration.hadoop.config;
18+
19+
import org.springframework.beans.factory.support.AbstractBeanDefinition;
20+
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
21+
import org.springframework.beans.factory.xml.ParserContext;
22+
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
23+
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
24+
import org.springframework.util.StringUtils;
25+
import org.w3c.dom.Element;
26+
27+
/**
28+
* Parser for the 'avro-outbound-channel-adapter' element.
29+
*
30+
* @author Thomas Risberg
31+
*/
32+
public class AvroOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
33+
34+
@Override
35+
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
36+
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(AvroWritingMessageHandlerFactoryBean.class);
37+
String datasetOperations = element.getAttribute("dataset-operations");
38+
if (!StringUtils.hasText(datasetOperations)) {
39+
parserContext.getReaderContext().error("dataset-operations is required", element);
40+
}
41+
builder.addConstructorArgReference(datasetOperations);
42+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
43+
return builder.getBeanDefinition();
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.xd.integration.hadoop.config;
18+
19+
import org.springframework.beans.factory.FactoryBean;
20+
import org.springframework.data.hadoop.store.dataset.DatasetOperations;
21+
import org.springframework.util.Assert;
22+
import org.springframework.xd.hadoop.fs.AvroWriterFactory;
23+
import org.springframework.xd.integration.hadoop.outbound.HdfsWritingMessageHandler;
24+
25+
/**
26+
* @author Thomas Risberg
27+
*/
28+
public class AvroWritingMessageHandlerFactoryBean implements FactoryBean<HdfsWritingMessageHandler> {
29+
30+
private final DatasetOperations datasetOperations;
31+
32+
private volatile Boolean autoStartup;
33+
34+
private volatile HdfsWritingMessageHandler handler;
35+
36+
public AvroWritingMessageHandlerFactoryBean(DatasetOperations datasetOperations) {
37+
Assert.notNull(datasetOperations, "datasetOperations must not be null");
38+
this.datasetOperations = datasetOperations;
39+
}
40+
41+
public void setAutoStartup(boolean autoStartup) {
42+
this.autoStartup = autoStartup;
43+
}
44+
45+
@Override
46+
public Class<?> getObjectType() {
47+
return HdfsWritingMessageHandler.class;
48+
}
49+
50+
@Override
51+
public boolean isSingleton() {
52+
return true;
53+
}
54+
55+
@Override
56+
public synchronized HdfsWritingMessageHandler getObject() throws Exception {
57+
if (handler == null) {
58+
AvroWriterFactory writerFactory = new AvroWriterFactory(this.datasetOperations);
59+
this.handler = new HdfsWritingMessageHandler(writerFactory);
60+
if (this.autoStartup != null) {
61+
this.handler.setAutoStartup(this.autoStartup);
62+
}
63+
}
64+
return this.handler;
65+
}
66+
67+
}

spring-xd-hadoop/src/main/java/org/springframework/xd/integration/hadoop/config/HadoopNamespaceHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
* Namespace handler for Spring Integration's 'hadoop' namespace.
2323
*
2424
* @author Mark Fisher
25+
* @author Thomas Risberg
2526
*/
2627
public class HadoopNamespaceHandler extends AbstractIntegrationNamespaceHandler {
2728

2829
@Override
2930
public void init() {
3031
registerBeanDefinitionParser("hdfs-outbound-channel-adapter", new HdfsOutboundChannelAdapterParser());
32+
registerBeanDefinitionParser("avro-outbound-channel-adapter", new AvroOutboundChannelAdapterParser());
3133
}
3234

3335
}

spring-xd-hadoop/src/main/resources/org/springframework/xd/integration/hadoop/config/spring-integration-hadoop.xsd

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,18 @@
3535
</xsd:complexType>
3636
</xsd:element>
3737

38+
<xsd:element name="avro-outbound-channel-adapter">
39+
<xsd:annotation>
40+
<xsd:documentation>
41+
Defines an outbound HDFS Avro writing Channel Adapter.
42+
</xsd:documentation>
43+
</xsd:annotation>
44+
<xsd:complexType>
45+
<xsd:attribute name="id" use="optional"/>
46+
<xsd:attribute name="channel" use="optional"/>
47+
<xsd:attribute name="dataset-operations" use="required"/>
48+
<xsd:attribute name="auto-startup" default="true"/>
49+
</xsd:complexType>
50+
</xsd:element>
51+
3852
</xsd:schema>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.xd.hadoop.fs;
18+
19+
import com.cloudera.cdk.data.PartitionStrategy;
20+
21+
import org.springframework.data.hadoop.store.dataset.DatasetOperations;
22+
import org.springframework.data.hadoop.store.dataset.DatasetRepositoryCallback;
23+
import org.springframework.data.hadoop.store.dataset.RecordCallback;
24+
25+
import java.util.Collection;
26+
27+
/**
28+
* @author Thomas Risberg
29+
*/
30+
public class StubDatasetOperations implements DatasetOperations {
31+
32+
@Override
33+
public <T> void read(Class<T> targetClass, RecordCallback<T> callback) {
34+
throw new UnsupportedOperationException("not implemented");
35+
}
36+
37+
@Override
38+
public <T> Collection<T> read(Class<T> targetClass) {
39+
throw new UnsupportedOperationException("not implemented");
40+
}
41+
42+
@Override
43+
public void write(Collection<?> records, PartitionStrategy partitionStrategy) {
44+
throw new UnsupportedOperationException("not implemented");
45+
}
46+
47+
@Override
48+
public void write(Collection<?> records) {
49+
throw new UnsupportedOperationException("not implemented");
50+
}
51+
52+
@Override
53+
public void execute(DatasetRepositoryCallback callback) {
54+
throw new UnsupportedOperationException("not implemented");
55+
}
56+
57+
@Override
58+
public <T> String getDatasetName(Class<T> clazz) {
59+
throw new UnsupportedOperationException("not implemented");
60+
}
61+
}

0 commit comments

Comments
 (0)