Skip to content

Commit a377486

Browse files
Add support for native CloudEvents functions. (GoogleCloudPlatform#58)
* Add support for native CloudEvents functions. This means functions that receive a CloudEvent payload and dispatch it to a user function that expects a CloudEvent object. We do not yet handle converting a CloudEvent payload to the form expected by legacy GCF functions. * Add explanatory comments to the new code in BackgroundFunctionExecutor.
1 parent 1ca4d06 commit a377486

File tree

5 files changed

+214
-66
lines changed

5 files changed

+214
-66
lines changed

invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java

Lines changed: 124 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
package com.google.cloud.functions.invoker;
1616

1717
import static java.nio.charset.StandardCharsets.UTF_8;
18+
import static java.util.stream.Collectors.toList;
1819
import static java.util.stream.Collectors.toMap;
1920

2021
import com.google.cloud.functions.BackgroundFunction;
2122
import com.google.cloud.functions.Context;
23+
import com.google.cloud.functions.ExperimentalCloudEventsFunction;
2224
import com.google.cloud.functions.RawBackgroundFunction;
2325
import com.google.gson.Gson;
2426
import com.google.gson.GsonBuilder;
@@ -54,20 +56,63 @@ private BackgroundFunctionExecutor(FunctionExecutor<?> functionExecutor) {
5456
this.functionExecutor = functionExecutor;
5557
}
5658

59+
private enum FunctionKind {
60+
BACKGROUND(BackgroundFunction.class),
61+
RAW_BACKGROUND(RawBackgroundFunction.class),
62+
CLOUD_EVENTS(ExperimentalCloudEventsFunction.class);
63+
64+
static final List<FunctionKind> VALUES = Arrays.asList(values());
65+
66+
final Class<?> functionClass;
67+
68+
FunctionKind(Class<?> functionClass) {
69+
this.functionClass = functionClass;
70+
}
71+
72+
/** Returns the {@link FunctionKind} that the given class implements, if any. */
73+
static Optional<FunctionKind> forClass(Class<?> functionClass) {
74+
return VALUES.stream().filter(v -> v.functionClass.isAssignableFrom(functionClass)).findFirst();
75+
}
76+
}
77+
5778
/**
58-
* Makes a {@link HttpFunctionExecutor} for the given class.
79+
* Optionally makes a {@link BackgroundFunctionExecutor} for the given class, if it implements one
80+
* of {@link BackgroundFunction}, {@link RawBackgroundFunction}, or
81+
* {@link ExperimentalCloudEventsFunction}. Otherwise returns {@link Optional#empty()}.
82+
*
83+
* @param functionClass the class of a possible background function implementation.
84+
* @throws RuntimeException if the given class does implement one of the required interfaces, but we are
85+
* unable to construct an instance using its no-arg constructor.
86+
*/
87+
public static Optional<BackgroundFunctionExecutor> maybeForClass(Class<?> functionClass) {
88+
Optional<FunctionKind> maybeFunctionKind = FunctionKind.forClass(functionClass);
89+
if (!maybeFunctionKind.isPresent()) {
90+
return Optional.empty();
91+
}
92+
return Optional.of(forClass(functionClass, maybeFunctionKind.get()));
93+
}
94+
95+
/**
96+
* Makes a {@link BackgroundFunctionExecutor} for the given class.
5997
*
6098
* @throws RuntimeException if either the class does not implement one of
61-
* {@link BackgroundFunction} or {@link RawBackgroundFunction},
62-
* or we are unable to construct an instance using its no-arg constructor.
99+
* {@link BackgroundFunction}, {@link RawBackgroundFunction}, or
100+
* {@link ExperimentalCloudEventsFunction}; or we are unable to construct an instance using its no-arg
101+
* constructor.
63102
*/
64103
public static BackgroundFunctionExecutor forClass(Class<?> functionClass) {
65-
if (!BackgroundFunction.class.isAssignableFrom(functionClass)
66-
&& !RawBackgroundFunction.class.isAssignableFrom(functionClass)) {
104+
Optional<FunctionKind> maybeFunctionKind = FunctionKind.forClass(functionClass);
105+
if (!maybeFunctionKind.isPresent()) {
106+
List<String> classNames =
107+
FunctionKind.VALUES.stream().map(v -> v.functionClass.getName()).collect(toList());
67108
throw new RuntimeException(
68-
"Class " + functionClass.getName() + " implements neither " + BackgroundFunction.class
69-
.getName() + " nor " + RawBackgroundFunction.class.getName());
109+
"Class " + functionClass.getName() + " must implement one of these interfaces: "
110+
+ String.join(", ", classNames));
70111
}
112+
return forClass(functionClass, maybeFunctionKind.get());
113+
}
114+
115+
private static BackgroundFunctionExecutor forClass(Class<?> functionClass, FunctionKind functionKind) {
71116
Object instance;
72117
try {
73118
instance = functionClass.getConstructor().newInstance();
@@ -76,23 +121,31 @@ public static BackgroundFunctionExecutor forClass(Class<?> functionClass) {
76121
"Could not construct an instance of " + functionClass.getName() + ": " + e, e);
77122
}
78123
FunctionExecutor<?> executor;
79-
if (instance instanceof RawBackgroundFunction) {
80-
executor = new RawFunctionExecutor((RawBackgroundFunction) instance);
81-
} else {
82-
BackgroundFunction<?> backgroundFunction = (BackgroundFunction<?>) instance;
83-
@SuppressWarnings("unchecked")
84-
Class<? extends BackgroundFunction<?>> c =
85-
(Class<? extends BackgroundFunction<?>>) backgroundFunction.getClass();
86-
Optional<Type> maybeTargetType = backgroundFunctionTypeArgument(c);
87-
if (!maybeTargetType.isPresent()) {
88-
// This is probably because the user implemented just BackgroundFunction rather than
89-
// BackgroundFunction<T>.
90-
throw new RuntimeException(
91-
"Could not determine the payload type for BackgroundFunction of type "
92-
+ instance.getClass().getName()
93-
+ "; must implement BackgroundFunction<T> for some T");
94-
}
95-
executor = new TypedFunctionExecutor<>(maybeTargetType.get(), backgroundFunction);
124+
switch (functionKind) {
125+
case RAW_BACKGROUND:
126+
executor = new RawFunctionExecutor((RawBackgroundFunction) instance);
127+
break;
128+
case BACKGROUND:
129+
BackgroundFunction<?> backgroundFunction = (BackgroundFunction<?>) instance;
130+
@SuppressWarnings("unchecked")
131+
Class<? extends BackgroundFunction<?>> c =
132+
(Class<? extends BackgroundFunction<?>>) backgroundFunction.getClass();
133+
Optional<Type> maybeTargetType = backgroundFunctionTypeArgument(c);
134+
if (!maybeTargetType.isPresent()) {
135+
// This is probably because the user implemented just BackgroundFunction rather than
136+
// BackgroundFunction<T>.
137+
throw new RuntimeException(
138+
"Could not determine the payload type for BackgroundFunction of type "
139+
+ instance.getClass().getName()
140+
+ "; must implement BackgroundFunction<T> for some T");
141+
}
142+
executor = new TypedFunctionExecutor<>(maybeTargetType.get(), backgroundFunction);
143+
break;
144+
case CLOUD_EVENTS:
145+
executor = new CloudEventFunctionExecutor((ExperimentalCloudEventsFunction) instance);
146+
break;
147+
default: // can't happen, we've listed all the FunctionKind values already.
148+
throw new AssertionError(functionKind);
96149
}
97150
return new BackgroundFunctionExecutor(executor);
98151
}
@@ -177,12 +230,9 @@ final ClassLoader functionClassLoader() {
177230
return functionClass.getClassLoader();
178231
}
179232

180-
abstract void serviceLegacyEvent(HttpServletRequest req)
181-
throws Exception;
233+
abstract void serviceLegacyEvent(Event legacyEvent) throws Exception;
182234

183235
abstract void serviceCloudEvent(CloudEvent cloudEvent) throws Exception;
184-
185-
abstract Class<CloudEventDataT> cloudEventDataType();
186236
}
187237

188238
private static class RawFunctionExecutor extends FunctionExecutor<Map<?, ?>> {
@@ -194,9 +244,8 @@ private static class RawFunctionExecutor extends FunctionExecutor<Map<?, ?>> {
194244
}
195245

196246
@Override
197-
void serviceLegacyEvent(HttpServletRequest req) throws Exception {
198-
Event event = parseLegacyEvent(req);
199-
function.accept(new Gson().toJson(event.getData()), event.getContext());
247+
void serviceLegacyEvent(Event legacyEvent) throws Exception {
248+
function.accept(new Gson().toJson(legacyEvent.getData()), legacyEvent.getContext());
200249
}
201250

202251
@Override
@@ -205,15 +254,6 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
205254
String jsonData = cloudEvent.getData() == null ? "{}" : new String(cloudEvent.getData(), UTF_8);
206255
function.accept(jsonData, context);
207256
}
208-
209-
@Override
210-
Class<Map<?, ?>> cloudEventDataType() {
211-
// This messing about with casts and @SuppressWarnings allows us to limit the use of the raw
212-
// Map type to just here.
213-
@SuppressWarnings("unchecked")
214-
Class<Map<?, ?>> c = (Class<Map<?, ?>>) (Class<?>) Map.class;
215-
return c;
216-
}
217257
}
218258

219259
private static class TypedFunctionExecutor<T> extends FunctionExecutor<T> {
@@ -233,10 +273,9 @@ static <T> TypedFunctionExecutor<T> of(Type type, BackgroundFunction<?> instance
233273
}
234274

235275
@Override
236-
void serviceLegacyEvent(HttpServletRequest req) throws Exception {
237-
Event event = parseLegacyEvent(req);
238-
T payload = new Gson().fromJson(event.getData(), type);
239-
function.accept(payload, event.getContext());
276+
void serviceLegacyEvent(Event legacyEvent) throws Exception {
277+
T payload = new Gson().fromJson(legacyEvent.getData(), type);
278+
function.accept(payload, legacyEvent.getContext());
240279
}
241280

242281
@Override
@@ -250,27 +289,33 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
250289
throw new IllegalStateException("Event has no \"data\" component");
251290
}
252291
}
292+
}
293+
294+
private static class CloudEventFunctionExecutor extends FunctionExecutor<Void>{
295+
private final ExperimentalCloudEventsFunction function;
296+
297+
CloudEventFunctionExecutor(ExperimentalCloudEventsFunction function) {
298+
super(function.getClass());
299+
this.function = function;
300+
}
253301

254302
@Override
255-
Class<T> cloudEventDataType() {
256-
if (!(type instanceof Class<?>)) {
257-
throw new IllegalStateException(
258-
"CloudEvents SDK currently does not permit deserializing types other than classes:"
259-
+ " cannot deserialize " + type);
260-
}
261-
@SuppressWarnings("unchecked")
262-
Class<T> c = (Class<T>) type;
263-
return c;
303+
void serviceLegacyEvent(Event legacyEvent) throws Exception {
304+
throw new UnsupportedOperationException(
305+
"Conversion from legacy events to CloudEvents not yet implemented");
306+
}
307+
308+
@Override
309+
void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
310+
function.accept(cloudEvent);
264311
}
265312
}
266313

267314
/** Executes the user's background function. This can handle all HTTP methods. */
268315
@Override
269316
public void service(HttpServletRequest req, HttpServletResponse res) throws IOException {
270317
String contentType = req.getContentType();
271-
ClassLoader oldContextLoader = Thread.currentThread().getContextClassLoader();
272318
try {
273-
Thread.currentThread().setContextClassLoader(functionExecutor.functionClassLoader());
274319
if ((contentType != null && contentType.startsWith("application/cloudevents+json"))
275320
|| req.getHeader("ce-specversion") != null) {
276321
serviceCloudEvent(req);
@@ -281,8 +326,6 @@ public void service(HttpServletRequest req, HttpServletResponse res) throws IOEx
281326
} catch (Throwable t) {
282327
res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
283328
logger.log(Level.WARNING, "Failed to execute " + functionExecutor.functionName(), t);
284-
} finally {
285-
Thread.currentThread().setContextClassLoader(oldContextLoader);
286329
}
287330
}
288331

@@ -306,10 +349,32 @@ private <CloudEventT> void serviceCloudEvent(HttpServletRequest req) throws Exce
306349
() -> headers.getOrDefault("ce-specversion", listOfNull).get(0),
307350
unusedSpecVersion -> CloudEventsServletBinaryMessageReader.from(req, body),
308351
UnknownEncodingMessageReader::new);
309-
executor.serviceCloudEvent(reader.toEvent());
352+
// It's important not to set the context ClassLoader earlier, because MessageUtils will use
353+
// ServiceLoader.load(EventFormat.class) to find a handler to deserialize a binary CloudEvent
354+
// and if it finds something from the function ClassLoader then that something will implement
355+
// the EventFormat interface as defined by that ClassLoader rather than ours. Then ServiceLoader.load
356+
// will throw ServiceConfigurationError. At this point we're still running with the default
357+
// context ClassLoader, which is the system ClassLoader that has loaded the code here.
358+
runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent()));
310359
}
311360

312361
private void serviceLegacyEvent(HttpServletRequest req) throws Exception {
313-
functionExecutor.serviceLegacyEvent(req);
362+
Event event = parseLegacyEvent(req);
363+
runWithContextClassLoader(() -> functionExecutor.serviceLegacyEvent(event));
364+
}
365+
366+
private void runWithContextClassLoader(ContextClassLoaderTask task) throws Exception {
367+
ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
368+
try {
369+
Thread.currentThread().setContextClassLoader(functionExecutor.functionClassLoader());
370+
task.run();
371+
} finally {
372+
Thread.currentThread().setContextClassLoader(oldLoader);
373+
}
374+
}
375+
376+
@FunctionalInterface
377+
private interface ContextClassLoaderTask {
378+
void run() throws Exception;
314379
}
315380
}

invoker/core/src/main/java/com/google/cloud/functions/invoker/CloudEventsServletBinaryMessageReader.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.cloudevents.SpecVersion;
44
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
5-
import java.io.IOException;
65
import java.util.ArrayList;
76
import java.util.Collections;
87
import java.util.List;

invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.beust.jcommander.Parameter;
2121
import com.beust.jcommander.ParameterException;
2222
import com.google.cloud.functions.BackgroundFunction;
23+
import com.google.cloud.functions.ExperimentalCloudEventsFunction;
2324
import com.google.cloud.functions.HttpFunction;
2425
import com.google.cloud.functions.RawBackgroundFunction;
2526
import com.google.cloud.functions.invoker.BackgroundFunctionExecutor;
@@ -192,11 +193,18 @@ private static ClassLoader makeClassLoader(Optional<String> functionClasspath) {
192193
ClassLoader runtimeLoader = Invoker.class.getClassLoader();
193194
if (functionClasspath.isPresent()) {
194195
ClassLoader parent = new OnlyApiClassLoader(runtimeLoader);
195-
return new URLClassLoader(classpathToUrls(functionClasspath.get()), parent);
196+
return new FunctionClassLoader(classpathToUrls(functionClasspath.get()), parent);
196197
}
197198
return runtimeLoader;
198199
}
199200

201+
// This is a subclass just so we can identify it from its toString().
202+
private static class FunctionClassLoader extends URLClassLoader {
203+
FunctionClassLoader(URL[] urls, ClassLoader parent) {
204+
super(urls, parent);
205+
}
206+
}
207+
200208
private final Integer port;
201209
private final String functionTarget;
202210
private final String functionSignatureType;
@@ -286,9 +294,10 @@ private HttpServlet servletForDeducedSignatureType(Class<?> functionClass) {
286294
if (HttpFunction.class.isAssignableFrom(functionClass)) {
287295
return HttpFunctionExecutor.forClass(functionClass);
288296
}
289-
if (BackgroundFunction.class.isAssignableFrom(functionClass)
290-
|| RawBackgroundFunction.class.isAssignableFrom(functionClass)) {
291-
return BackgroundFunctionExecutor.forClass(functionClass);
297+
Optional<BackgroundFunctionExecutor> maybeExecutor =
298+
BackgroundFunctionExecutor.maybeForClass(functionClass);
299+
if (maybeExecutor.isPresent()) {
300+
return maybeExecutor.get();
292301
}
293302
String error = String.format(
294303
"Could not determine function signature type from target %s. Either this should be"
@@ -405,12 +414,21 @@ private static class OnlyApiClassLoader extends ClassLoader {
405414
protected Class<?> findClass(String name) throws ClassNotFoundException {
406415
String prefix = "com.google.cloud.functions.";
407416
if ((name.startsWith(prefix) && Character.isUpperCase(name.charAt(prefix.length())))
408-
|| name.startsWith("javax.servlet.")) {
417+
|| name.startsWith("javax.servlet.")
418+
|| isCloudEventsApiClass(name)) {
409419
return runtimeClassLoader.loadClass(name);
410420
}
411421
return super.findClass(name); // should throw ClassNotFoundException
412422
}
413423

424+
private static final String CLOUD_EVENTS_API_PREFIX = "io.cloudevents.";
425+
private static final int CLOUD_EVENTS_API_PREFIX_LENGTH = CLOUD_EVENTS_API_PREFIX.length();
426+
427+
private static boolean isCloudEventsApiClass(String name) {
428+
return name.startsWith(CLOUD_EVENTS_API_PREFIX)
429+
&& Character.isUpperCase(name.charAt(CLOUD_EVENTS_API_PREFIX_LENGTH));
430+
}
431+
414432
private static ClassLoader getSystemOrBootstrapClassLoader() {
415433
try {
416434
// We're still building against the Java 8 API, so we have to use reflection for now.

0 commit comments

Comments
 (0)