Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.5</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar</artifactId>
<version>3.11.4</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.grpc.protobufjs.GrpcDynamicService;
import org.apache.dolphinscheduler.plugin.task.grpc.protobufjs.JSONDescriptorHelper;
import org.apache.dolphinscheduler.plugin.task.grpc.protofactory.ProtoFactory;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -79,8 +79,10 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
} else {
channel = GrpcDynamicService.ChannelFactory.createChannel(grpcParameters.getUrl());
}
// Descriptors.FileDescriptor fileDesc =
// JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
Descriptors.FileDescriptor fileDesc =
JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON());
ProtoFactory.createFileDescriptorFromProtoContent(grpcParameters.getGrpcServiceDefinition());
GrpcDynamicService stubService = new GrpcDynamicService(channel, fileDesc);
DynamicMessage message = stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(),
grpcParameters.getConnectTimeoutMs());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.grpc.protofactory;

public class DynamicService {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.grpc.protofactory;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;

import com.github.os72.protocjar.Protoc;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;

public class ProtoFactory {

public static Descriptors.FileDescriptor createFileDescriptorFromProtoContent(String protoContent) throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
File tmpDir = getTemplateDir();
File tmpProtoFile = getTemplateFile("template.proto");
File descFile = new File(tmpDir.getAbsolutePath() + "/output.desc");
File protoFile = saveProtoFile(protoContent, tmpProtoFile);
generateDescFile(protoFile, descFile);
DescriptorProtos.FileDescriptorProto fileDescriptorProto = loadDescFile(descFile);
return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, new Descriptors.FileDescriptor[]{});
}

public static File getTemplateFile(String name) throws IOException {
File tmpDir = getTemplateDir();
File tmpFile = new File(tmpDir.getAbsolutePath() + "/" + name);
if (tmpFile.exists()) {
if (!tmpFile.delete()) {
throw new IOException("Could not delete existing template file " + tmpFile.getAbsolutePath());
} ;
}
if (!tmpFile.createNewFile()) {
throw new IOException("Could not create template file " + tmpFile.getAbsolutePath());
}
return tmpFile;
}

public static File getTemplateDir() throws IOException {
Path path = Paths.get(FileUtils.getTempDirectory().getAbsolutePath(), UUID.randomUUID().toString());
return Files.createDirectories(path).toFile();
}

public static void runProtoc(String[] args) throws IOException, InterruptedException {

// String[] args = {"-v2.4.1", "--help"};
Protoc.runProtoc(args);
}

public static File saveProtoFile(String protoContent, File outputFile) throws IOException {
FileUtils.writeStringToFile(outputFile, protoContent, "UTF-8");
return outputFile;
}

public static void generateDescFile(File protoFile, File outputFile) throws IOException, InterruptedException {

String[] args = new String[]{
// "-I=" + protoFile.getParentFile().getAbsolutePath(),
"--descriptor_set_out=" + outputFile.getAbsolutePath(),
// "--include_imports",
protoFile.getAbsolutePath()
};
runProtoc(args);
}

public static DescriptorProtos.FileDescriptorProto loadDescFile(File descriptorFile) throws IOException {
final FileInputStream fileInputStream = new FileInputStream(descriptorFile);
final DescriptorProtos.FileDescriptorProto descriptorProto =
DescriptorProtos.FileDescriptorProto.parseFrom(fileInputStream);
return descriptorProto;
}
}
Loading