-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56284] Adding UDF worker specification protobuf definition #55165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| syntax = "proto3"; | ||
|
|
||
| package org.apache.spark.udf.worker; | ||
|
|
||
| option java_package = "org.apache.spark.udf.worker"; | ||
|
|
||
| option java_multiple_files = true; | ||
|
|
||
| // The UDF in & output data format. | ||
| enum UDFWorkerDataFormat { | ||
| UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0; | ||
|
|
||
| // The worker accepts and produces Apache arrow batches. | ||
| ARROW = 1; | ||
| } | ||
|
|
||
| // The UDF execution type/shape. | ||
| message UDFShape { | ||
| oneof shape { | ||
| SparkUDFShape spark = 1; | ||
| } | ||
| } | ||
|
|
||
| enum SparkUDFShape { | ||
| SPARK_UDF_SHAPE_UNSPECIFIED = 0; | ||
|
|
||
| // UDF receives a row with 0+ columns as input | ||
| // and produces a single, scalar value as output | ||
| EXPRESSION = 1; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Name it |
||
|
|
||
| // UDF receives an iterator to a batch of rows as input and | ||
| // produces an iterator to a batch of rows as output. | ||
| MAP_PARTITIONS = 2; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -17,12 +17,194 @@ | |||||
|
|
||||||
| syntax = "proto3"; | ||||||
|
|
||||||
| import "common.proto"; | ||||||
|
|
||||||
| package org.apache.spark.udf.worker; | ||||||
|
|
||||||
| option java_package = "org.apache.spark.udf.worker"; | ||||||
| option java_multiple_files = true; | ||||||
|
|
||||||
| // Placeholder -- full worker specification schema to be added. | ||||||
| // See design doc: SPIP SPARK-55278. | ||||||
| /// | ||||||
| /// Worker specification | ||||||
| /// | ||||||
| message UDFWorkerSpecification { | ||||||
| WorkerEnvironment environment = 1; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WorkerEnvironment looks like it is in the wrong place. It uses a lot of ProcessCallables which only seem to make sense for DirectWorker. Why not make it part of the direct worker to begin with? |
||||||
| WorkerCapabilities capabilities = 2; | ||||||
|
|
||||||
| // How to create new workers. | ||||||
| // At the moment, only direct creation is supported. | ||||||
| // This can be extended with indirect/provisioned creation in the future. | ||||||
| oneof worker { | ||||||
| DirectWorker direct = 3; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| message WorkerEnvironment { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs documentation. |
||||||
| // Callable that is responsible for environment setup. | ||||||
| optional ProcessCallable installation = 1; | ||||||
| // Callable, which is called to verify that an environment | ||||||
| // is suitable to start the UDF worker. This callable | ||||||
| // needs to verify that | ||||||
| // - The worker code itself is present | ||||||
| // - Any needed dependencies are present | ||||||
| optional ProcessCallable environment_verification = 2; | ||||||
| // Callable, which is invoked after the worker has been terminated. | ||||||
| // This can be used to cleanup dependencies or temporary resources. | ||||||
| // Be careful not to cleanup resources that could be used by | ||||||
| // other workers running in parallel. | ||||||
| optional ProcessCallable environment_cleanup = 3; | ||||||
| } | ||||||
|
|
||||||
| // Capabilities used for query planning | ||||||
| message WorkerCapabilities { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO it would be a good idea to make this have a somewhat self describing name. |
||||||
| // The data formats that the worker supports for UDF data in- & output. | ||||||
| // Every worker MUST at least support ARROW. | ||||||
| // | ||||||
| // It is expected that for each UDF execution, the input format | ||||||
| // always matches the output format. | ||||||
| // | ||||||
| // If a worker supports multiple data formats, the engine will select | ||||||
| // the most suitable one for each UDF invocation. Which format was chosen | ||||||
| // is reported by the engine as part of the UDF protocol's init message. | ||||||
| repeated UDFWorkerDataFormat supported_data_formats = 1; | ||||||
|
|
||||||
| // Which types of UDFs this worker supports. | ||||||
| // This should list all supported Shapes. | ||||||
| // The shape of a specific UDF will be communicated | ||||||
| // in the initial message of the UDF protocol. | ||||||
| // | ||||||
| // If an execution for an unsupported UDF type is requested | ||||||
| // the query will fail during query planning. | ||||||
| repeated UDFShape supported_udf_shapes = 2; | ||||||
|
|
||||||
| // Whether multiple, concurrent UDF | ||||||
| // connections are supported by this worker | ||||||
| // (for example via multi-threading). | ||||||
| // | ||||||
| // In the first implementation of the engine-side | ||||||
| // worker specification, this property will not be used. | ||||||
| // | ||||||
| // Usage of this property can be enabled in the future if the | ||||||
| // engine implements more advanced resource management (TBD). | ||||||
| // | ||||||
| bool supports_concurrent_udfs = 3; | ||||||
|
|
||||||
| // Whether compatible workers may be reused. | ||||||
| // If this is not supported, the worker is | ||||||
| // terminated after every single UDF invocation. | ||||||
| bool supports_reuse = 4; | ||||||
|
|
||||||
| // To be extended with UDF chaining, ... | ||||||
| } | ||||||
|
|
||||||
| // The worker that will be created to process UDFs | ||||||
| message DirectWorker { | ||||||
| // Blocking callable that is terminated by SIGTERM/SIGKILL | ||||||
| ProcessCallable runner = 1; | ||||||
|
|
||||||
| UDFWorkerProperties properties = 2; | ||||||
| } | ||||||
|
|
||||||
| message UDFWorkerProperties { | ||||||
| // Maximum amount of time to wait until the worker can accept connections. | ||||||
| // | ||||||
| // The engine will use this timeout, if it does not exceed an | ||||||
| // engine-configurable maximum time (e.g. 30 seconds). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue at line 119.
Suggested change
|
||||||
| optional int32 initialization_timeout_ms = 1; | ||||||
|
|
||||||
| // Used for graceful engine-initiated termination signaled via SIGTERM. | ||||||
| // After this time, the worker process should have terminated itself. | ||||||
| // Otherwise, the process will be forcefully killed using SIGKILL. | ||||||
| // | ||||||
| // The engine will use this timeout, if it does not exceed an | ||||||
| // engine-configurable maximum time (e.g. 30 seconds). | ||||||
| optional int32 graceful_termination_timeout_ms = 2; | ||||||
|
|
||||||
| // The connection this [[DirectWorker]] supports. Note that a single | ||||||
| // connection is sufficient to run multiple UDFs and (gRPC) services. | ||||||
| // | ||||||
| // On [[DirectWorker]] creation, connection information | ||||||
| // is passed to the callable as a string parameter. | ||||||
| // The string format depends on the [[WorkerConnection]]: | ||||||
| // | ||||||
| // For example, when using TCP, the callable argument will be: | ||||||
| // --connection PORT | ||||||
| // Here is a concrete example | ||||||
| // --connection 8080 | ||||||
| // | ||||||
| // For the format of each specific transport type, see the comments below. | ||||||
| WorkerConnection connection = 3; | ||||||
| } | ||||||
|
|
||||||
| message WorkerConnection { | ||||||
| oneof transport { | ||||||
| UnixDomainSocket unix_domain_socket = 1; | ||||||
| TcpConnection tcp = 2; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // Communication between the engine and worker | ||||||
| // is done using a UNIX domain socket. | ||||||
| // | ||||||
| // On [[DirectWorker]] creation, a path to a socket | ||||||
| // to listen on is passed as an argument. | ||||||
| // Examples: | ||||||
| // /tmp/channel-uuid.sock | ||||||
| // /some/system/path/channel-1234.sock | ||||||
| message UnixDomainSocket {} | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am guessing you are going to add a path to this proto at some point? |
||||||
|
|
||||||
| // Communication between the engine and worker | ||||||
| // is done using a localhost TCP connection. | ||||||
| // | ||||||
| // On [[DirectWorker]] creation, a PORT | ||||||
| // is passed as a connection parameter. | ||||||
| // | ||||||
| // It is expected that the worker binds to this | ||||||
| // port on both IPv4 and IPv6 localhost interfaces. | ||||||
| // E.g. the worker server should be reachable via | ||||||
| // 127.0.0.1:PORT and [::1]:PORT. | ||||||
| // | ||||||
| // Examples: | ||||||
| // 8080 | ||||||
| // 1234 | ||||||
| message TcpConnection {} | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is local only, then name it as such? Same question as before. I assume you are going to add a port here? |
||||||
|
|
||||||
| message ProcessCallable { | ||||||
| // Executable to invoke. | ||||||
| // Examples: | ||||||
| // ["python3", "-m"] | ||||||
| // ["worker.bin"] | ||||||
| // ["java", "worker.java"] | ||||||
| // ["/bin/bash", "-c"] | ||||||
| // This executable should be blocking, until the task is finished. | ||||||
| // Termination is requested via a SIGTERM signal. | ||||||
| // | ||||||
| // Success/Failure can be indicated via exit codes: | ||||||
| // Exit code 0 -> Success | ||||||
| // Exit code != 0 -> Failure | ||||||
| repeated string command = 1; | ||||||
|
|
||||||
| // Arguments passed directly to the executable. | ||||||
| // Examples: | ||||||
| // ["udf_worker.py"] | ||||||
| // [""] | ||||||
| // ["--max_concurrency", "5"] | ||||||
| // ["\"echo 'Test'\""] | ||||||
| // | ||||||
| // Every executable will ALWAYS receive the following arguments, | ||||||
| // which CANNOT be part of the below list of arguments: | ||||||
| // | ||||||
| // --id | ||||||
| // The value of the id argument is a string with the engine-assigned | ||||||
| // id of this UDF Worker. This can be used in logs and other state information. | ||||||
| // | ||||||
| // --connection | ||||||
| // The value of the connection argument is a string with | ||||||
| // engine-assinged connection parameters. See [[UDFWorkerProperties]] | ||||||
| // for details. | ||||||
| // | ||||||
| repeated string arguments = 2; | ||||||
|
|
||||||
| // Environment variables for the invoked process. | ||||||
| map<string, string> environment_variables = 3; | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this indirection? The shape also has nothing to do with spark...