diff --git a/udf/worker/README.md b/udf/worker/README.md index f13ce85d4dad1..fa27430b62b62 100644 --- a/udf/worker/README.md +++ b/udf/worker/README.md @@ -21,7 +21,7 @@ implementation based on the worker specification. ``` udf/worker/ ├── proto/ Protobuf definition of the worker specification -│ (UDFWorkerSpecification -- currently a placeholder). +│ (UDFWorkerSpecification). │ WorkerSpecification -- typed Scala wrapper around the protobuf spec. └── core/ Engine-side APIs (all @Experimental): WorkerDispatcher -- manages workers for one spec; creates sessions. diff --git a/udf/worker/proto/src/main/protobuf/common.proto b/udf/worker/proto/src/main/protobuf/common.proto new file mode 100644 index 0000000000000..9c50cdd7a7e4b --- /dev/null +++ b/udf/worker/proto/src/main/protobuf/common.proto @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; + +option java_multiple_files = true; + +// The UDF in & output data format. +enum UDFWorkerDataFormat { + UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0; + + // The worker accepts and produces Apache arrow batches. + ARROW = 1; +} + +// The UDF execution type/shape. +enum UDFProtoCommunicationPattern { + UDF_PROTO_COMMUNICATION_PATTERN_UNSPECIFIED = 0; + + // Data exachanged as a bidrectional + // stream of bytes. + BIDIRECTIONAL_STREAMING = 1; +} diff --git a/udf/worker/proto/src/main/protobuf/worker_spec.proto b/udf/worker/proto/src/main/protobuf/worker_spec.proto index 92dfccd6ef108..f2eacf2b3ce35 100644 --- a/udf/worker/proto/src/main/protobuf/worker_spec.proto +++ b/udf/worker/proto/src/main/protobuf/worker_spec.proto @@ -17,12 +17,272 @@ syntax = "proto3"; +import "common.proto"; + package org.apache.spark.udf.worker; option java_package = "org.apache.spark.udf.worker"; option java_multiple_files = true; -// Placeholder -- full worker specification schema to be added. -// See design doc: SPIP SPARK-55278. +/// +/// Worker specification +/// message UDFWorkerSpecification { + // (Required) + WorkerEnvironment environment = 1; + // (Required) + WorkerCapabilities capabilities = 2; + + // How to create new workers. + // At the moment, only direct creation is supported. + // This can be extended with indirect/provisioned creation in the future. + // + // (Required) + oneof worker { + DirectWorker direct = 3; + } +} + +// Set of callables that can be used to setup, verify, +// and cleanup the worker environment before the worker +// callable is invoked. +// +// These callables should be specified if the worker +// setup cannot be assumed to be pre-installed on the +// target cluster. +// +// If defined, the below callables will be invoked once +// per Spark executor in the following order (top -> bottom): +// +// ┌──────────────┐ +// │ │ +// │ Verification │ +// │ │ +// └──┬────────┬──┘ +// │ │ +// Failed │ │ Succeeded +// │ │ +// ┌──────▼───────┐│ +// │ ││ +// │ Installation ││ +// │ ││ +// └──────┬───────┘│ +// │ │ +// ▼ ▼ +// ... +// UDF worker creation +// ... +// │ +// │ +// ┌────▼────┐ +// │ │ +// │ Cleanup │ +// │ │ +// └─────────┘ +// +// All scripts are optional. +// However, if a verification script is supplied, an installation +// script needs to be supplied as well. +// +message WorkerEnvironment { + // A callable that prepares cluster nodes by installing + // the required runtime, dependencies, and UDF worker binaries + // at the expected locations. This step ensures that + // the worker starting callable + // can subsequently locate and launch the worker process. + // + // (Required if environment_verification is given) + optional ProcessCallable installation = 1; + + // Callable, which is called to verify that an environment + // is suitable to start the UDF worker. This callable + // needs to verify that + // - The worker code itself is present + // - Any needed dependencies are present + // + // (Optional) + optional ProcessCallable environment_verification = 2; + + // Callable, which is invoked after all workers have been terminated. + // This can be used to cleanup dependencies or temporary resources. + // + // (Optional) + optional ProcessCallable environment_cleanup = 3; +} + +// Capabilities used for query planning +// and running the worker during query execution. +message WorkerCapabilities { + // The data formats that the worker supports for UDF data in- & output. + // Every worker MUST at least support ARROW. + // + // It is expected that for each UDF execution, the input format + // always matches the output format. + // + // If a worker supports multiple data formats, the engine will select + // the most suitable one for each UDF invocation. Which format was chosen + // is reported by the engine as part of the UDF protocol's init message. + // + // (Required) + repeated UDFWorkerDataFormat supported_data_formats = 1; + + // Which UDF protocol communication patterns the worker + // supports. This should list all supported patterns. + // The pattern used for a specific UDF will be communicated + // in the initial message of the UDF protocol. + // + // If an execution for an unsupported pattern is requested + // the query will fail during query planning. + // + // (Required) + repeated UDFProtoCommunicationPattern supported_communication_patterns = 2; + + // Whether multiple, concurrent UDF + // connections are supported by this worker + // (for example via multi-threading). + // + // In the first implementation of the engine-side + // worker specification, this property will not be used. + // + // Usage of this property can be enabled in the future if the + // engine implements more advanced resource management (TBD). + // + // (Optional) + optional bool supports_concurrent_udfs = 3; + + // Whether compatible workers may be reused. + // If this is not supported, the worker is + // terminated after every single UDF invocation. + // + // (Optional) + optional bool supports_reuse = 4; + + // To be extended with UDF chaining, ... +} + +// The worker that will be created to process UDFs +message DirectWorker { + // Blocking callable that is terminated by SIGTERM/SIGKILL + // + // (Required) + ProcessCallable runner = 1; + + // (Required) + UDFWorkerProperties properties = 2; } + +message UDFWorkerProperties { + // Maximum amount of time to wait until the worker can accept connections. + // + // The engine will use this timeout, if it does not exceed an + // engine-configurable maximum time (e.g. 30 seconds). + // + // (Optional) + optional int32 initialization_timeout_ms = 1; + + // Used for graceful engine-initiated termination signaled via SIGTERM. + // After this time, the worker process should have terminated itself. + // Otherwise, the process will be forcefully killed using SIGKILL. + // + // The engine will use this timeout, if it does not exceed an + // engine-configurable maximum time (e.g. 30 seconds). + // + // (Optional) + optional int32 graceful_termination_timeout_ms = 2; + + // The connection this [[DirectWorker]] supports. Note that a single + // connection is sufficient to run multiple UDFs and (gRPC) services. + // + // On [[DirectWorker]] creation, connection information + // is passed to the callable as a string parameter. + // The string format depends on the [[WorkerConnection]]: + // + // For example, when using TCP, the callable argument will be: + // --connection PORT + // Here is a concrete example + // --connection 8080 + // + // For the format of each specific transport type, see the comments below. + // + // (Required) + WorkerConnection connection = 3; +} + +message WorkerConnection { + // (Required) + oneof transport { + UnixDomainSocket unix_domain_socket = 1; + LocalTcpConnection tcp = 2; + } +} + +// Communication between the engine and worker +// is done using a UNIX domain socket. +// +// On [[DirectWorker]] creation, a path to a socket +// to listen on is passed as an argument. +// Examples: +// /tmp/channel-uuid.sock +// /some/system/path/channel-1234.sock +message UnixDomainSocket {} + +// Communication between the engine and worker +// is done using a localhost TCP connection. +// +// On [[DirectWorker]] creation, a PORT +// is passed as a connection parameter. +// +// It is expected that the worker binds to this +// port on both IPv4 and IPv6 localhost interfaces. +// E.g. the worker server should be reachable via +// 127.0.0.1:PORT and [::1]:PORT. +// +// Examples: +// 8080 +// 1234 +message LocalTcpConnection {} + +message ProcessCallable { + // Executable to invoke. + // Examples: + // ["python3", "-m"] + // ["worker.bin"] + // ["java", "worker.java"] + // ["/bin/bash", "-c"] + // This executable should be blocking, until the task is finished. + // Termination is requested via a SIGTERM signal. + // + // Success/Failure can be indicated via exit codes: + // Exit code 0 -> Success + // Exit code != 0 -> Failure + // + // (Required) + repeated string command = 1; + + // Arguments passed directly to the executable. + // Examples: + // ["udf_worker.py"] + // [""] + // ["--max_concurrency", "5"] + // ["\"echo 'Test'\""] + // + // Every executable will ALWAYS receive the following arguments, + // which CANNOT be part of the below list of arguments: + // + // --id + // The value of the id argument is a string with the engine-assigned + // id of this UDF Worker. This can be used in logs and other state information. + // + // --connection + // The value of the connection argument is a string with + // engine-assinged connection parameters. See [[UDFWorkerProperties]] + // for details. + // + // (Optional) + repeated string arguments = 2; + + // Environment variables for the invoked process. + // + // (Optional) + map environment_variables = 3; +}