Skip to content

Commit 5832ff6

Browse files
committed
[SPARK-56284] Adding UDF worker specification protobuf definition
1 parent 7737a94 commit 5832ff6

File tree

3 files changed

+304
-3
lines changed

3 files changed

+304
-3
lines changed

udf/worker/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ implementation based on the worker specification.
2121
```
2222
udf/worker/
2323
├── proto/ Protobuf definition of the worker specification
24-
│ (UDFWorkerSpecification -- currently a placeholder).
24+
│ (UDFWorkerSpecification).
2525
│ WorkerSpecification -- typed Scala wrapper around the protobuf spec.
2626
└── core/ Engine-side APIs (all @Experimental):
2727
WorkerDispatcher -- manages workers for one spec; creates sessions.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
syntax = "proto3";
19+
20+
package org.apache.spark.udf.worker;
21+
22+
option java_package = "org.apache.spark.udf.worker";
23+
24+
option java_multiple_files = true;
25+
26+
// The UDF in & output data format.
27+
enum UDFWorkerDataFormat {
28+
UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0;
29+
30+
// The worker accepts and produces Apache arrow batches.
31+
ARROW = 1;
32+
}
33+
34+
// The UDF execution type/shape.
35+
enum UDFProtoCommunicationPattern {
36+
UDF_PROTO_COMMUNICATION_PATTERN_UNSPECIFIED = 0;
37+
38+
// Data exachanged as a bidrectional
39+
// stream of bytes.
40+
BIDIRECTIONAL_STREAMING = 1;
41+
}

udf/worker/proto/src/main/protobuf/worker_spec.proto

Lines changed: 262 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,272 @@
1717

1818
syntax = "proto3";
1919

20+
import "common.proto";
21+
2022
package org.apache.spark.udf.worker;
2123

2224
option java_package = "org.apache.spark.udf.worker";
2325
option java_multiple_files = true;
2426

25-
// Placeholder -- full worker specification schema to be added.
26-
// See design doc: SPIP SPARK-55278.
27+
///
28+
/// Worker specification
29+
///
2730
message UDFWorkerSpecification {
31+
// (Required)
32+
WorkerEnvironment environment = 1;
33+
// (Required)
34+
WorkerCapabilities capabilities = 2;
35+
36+
// How to create new workers.
37+
// At the moment, only direct creation is supported.
38+
// This can be extended with indirect/provisioned creation in the future.
39+
//
40+
// (Required)
41+
oneof worker {
42+
DirectWorker direct = 3;
43+
}
44+
}
45+
46+
// Set of callables that can be used to setup, verify,
47+
// and cleanup the worker environment before the worker
48+
// callable is invoked.
49+
//
50+
// These callables should be specified if the worker
51+
// setup cannot be assumed to be pre-installed on the
52+
// target cluster.
53+
//
54+
// If defined, the below callables will be invoked once
55+
// per Spark executor in the following order (top -> bottom):
56+
//
57+
// ┌──────────────┐
58+
// │ │
59+
// │ Verification │
60+
// │ │
61+
// └──┬────────┬──┘
62+
// │ │
63+
// Failed │ │ Succeeded
64+
// │ │
65+
// ┌──────▼───────┐│
66+
// │ ││
67+
// │ Installation ││
68+
// │ ││
69+
// └──────┬───────┘│
70+
// │ │
71+
// ▼ ▼
72+
// ...
73+
// UDF execution
74+
// ...
75+
// │
76+
// │
77+
// ┌────▼────┐
78+
// │ │
79+
// │ Cleanup │
80+
// │ │
81+
// └─────────┘
82+
//
83+
// All scripts are optional.
84+
// However, if a verification script is supplied, an installation
85+
// script needs to be supplied as well.
86+
//
87+
message WorkerEnvironment {
88+
// A callable that prepares cluster nodes by installing
89+
// the required runtime, dependencies, and UDF worker binaries
90+
// at the expected locations. This step ensures that
91+
// the worker starting callable
92+
// can subsequently locate and launch the worker process.
93+
//
94+
// (Required if environment_verification is given)
95+
optional ProcessCallable installation = 1;
96+
97+
// Callable, which is called to verify that an environment
98+
// is suitable to start the UDF worker. This callable
99+
// needs to verify that
100+
// - The worker code itself is present
101+
// - Any needed dependencies are present
102+
//
103+
// (Optional)
104+
optional ProcessCallable environment_verification = 2;
105+
106+
// Callable, which is invoked after all workers have been terminated.
107+
// This can be used to cleanup dependencies or temporary resources.
108+
//
109+
// (Optional)
110+
optional ProcessCallable environment_cleanup = 3;
111+
}
112+
113+
// Capabilities used for query planning
114+
// and running the worker during query execution.
115+
message WorkerCapabilities {
116+
// The data formats that the worker supports for UDF data in- & output.
117+
// Every worker MUST at least support ARROW.
118+
//
119+
// It is expected that for each UDF execution, the input format
120+
// always matches the output format.
121+
//
122+
// If a worker supports multiple data formats, the engine will select
123+
// the most suitable one for each UDF invocation. Which format was chosen
124+
// is reported by the engine as part of the UDF protocol's init message.
125+
//
126+
// (Required)
127+
repeated UDFWorkerDataFormat supported_data_formats = 1;
128+
129+
// Which UDF protocol communication patterns the worker
130+
// supports. This should list all supported patterns.
131+
// The pattern used for a specific UDF will be communicated
132+
// in the initial message of the UDF protocol.
133+
//
134+
// If an execution for an unsupported pattern is requested
135+
// the query will fail during query planning.
136+
//
137+
// (Required)
138+
repeated UDFProtoCommunicationPattern supported_communication_patterns = 2;
139+
140+
// Whether multiple, concurrent UDF
141+
// connections are supported by this worker
142+
// (for example via multi-threading).
143+
//
144+
// In the first implementation of the engine-side
145+
// worker specification, this property will not be used.
146+
//
147+
// Usage of this property can be enabled in the future if the
148+
// engine implements more advanced resource management (TBD).
149+
//
150+
// (Optional)
151+
optional bool supports_concurrent_udfs = 3;
152+
153+
// Whether compatible workers may be reused.
154+
// If this is not supported, the worker is
155+
// terminated after every single UDF invocation.
156+
//
157+
// (Optional)
158+
optional bool supports_reuse = 4;
159+
160+
// To be extended with UDF chaining, ...
161+
}
162+
163+
// The worker that will be created to process UDFs
164+
message DirectWorker {
165+
// Blocking callable that is terminated by SIGTERM/SIGKILL
166+
//
167+
// (Required)
168+
ProcessCallable runner = 1;
169+
170+
// (Required)
171+
UDFWorkerProperties properties = 2;
28172
}
173+
174+
message UDFWorkerProperties {
175+
// Maximum amount of time to wait until the worker can accept connections.
176+
//
177+
// The engine will use this timeout, if it does not exceed an
178+
// engine-configurable maximum time (e.g. 30 seconds).
179+
//
180+
// (Optional)
181+
optional int32 initialization_timeout_ms = 1;
182+
183+
// Used for graceful engine-initiated termination signaled via SIGTERM.
184+
// After this time, the worker process should have terminated itself.
185+
// Otherwise, the process will be forcefully killed using SIGKILL.
186+
//
187+
// The engine will use this timeout, if it does not exceed an
188+
// engine-configurable maximum time (e.g. 30 seconds).
189+
//
190+
// (Optional)
191+
optional int32 graceful_termination_timeout_ms = 2;
192+
193+
// The connection this [[DirectWorker]] supports. Note that a single
194+
// connection is sufficient to run multiple UDFs and (gRPC) services.
195+
//
196+
// On [[DirectWorker]] creation, connection information
197+
// is passed to the callable as a string parameter.
198+
// The string format depends on the [[WorkerConnection]]:
199+
//
200+
// For example, when using TCP, the callable argument will be:
201+
// --connection PORT
202+
// Here is a concrete example
203+
// --connection 8080
204+
//
205+
// For the format of each specific transport type, see the comments below.
206+
//
207+
// (Required)
208+
WorkerConnection connection = 3;
209+
}
210+
211+
message WorkerConnection {
212+
// (Required)
213+
oneof transport {
214+
UnixDomainSocket unix_domain_socket = 1;
215+
LocalTcpConnection tcp = 2;
216+
}
217+
}
218+
219+
// Communication between the engine and worker
220+
// is done using a UNIX domain socket.
221+
//
222+
// On [[DirectWorker]] creation, a path to a socket
223+
// to listen on is passed as an argument.
224+
// Examples:
225+
// /tmp/channel-uuid.sock
226+
// /some/system/path/channel-1234.sock
227+
message UnixDomainSocket {}
228+
229+
// Communication between the engine and worker
230+
// is done using a localhost TCP connection.
231+
//
232+
// On [[DirectWorker]] creation, a PORT
233+
// is passed as a connection parameter.
234+
//
235+
// It is expected that the worker binds to this
236+
// port on both IPv4 and IPv6 localhost interfaces.
237+
// E.g. the worker server should be reachable via
238+
// 127.0.0.1:PORT and [::1]:PORT.
239+
//
240+
// Examples:
241+
// 8080
242+
// 1234
243+
message LocalTcpConnection {}
244+
245+
message ProcessCallable {
246+
// Executable to invoke.
247+
// Examples:
248+
// ["python3", "-m"]
249+
// ["worker.bin"]
250+
// ["java", "worker.java"]
251+
// ["/bin/bash", "-c"]
252+
// This executable should be blocking, until the task is finished.
253+
// Termination is requested via a SIGTERM signal.
254+
//
255+
// Success/Failure can be indicated via exit codes:
256+
// Exit code 0 -> Success
257+
// Exit code != 0 -> Failure
258+
//
259+
// (Required)
260+
repeated string command = 1;
261+
262+
// Arguments passed directly to the executable.
263+
// Examples:
264+
// ["udf_worker.py"]
265+
// [""]
266+
// ["--max_concurrency", "5"]
267+
// ["\"echo 'Test'\""]
268+
//
269+
// Every executable will ALWAYS receive the following arguments,
270+
// which CANNOT be part of the below list of arguments:
271+
//
272+
// --id
273+
// The value of the id argument is a string with the engine-assigned
274+
// id of this UDF Worker. This can be used in logs and other state information.
275+
//
276+
// --connection
277+
// The value of the connection argument is a string with
278+
// engine-assinged connection parameters. See [[UDFWorkerProperties]]
279+
// for details.
280+
//
281+
// (Optional)
282+
repeated string arguments = 2;
283+
284+
// Environment variables for the invoked process.
285+
//
286+
// (Optional)
287+
map<string, string> environment_variables = 3;
288+
}

0 commit comments

Comments
 (0)