Skip to content

Runtime Operator Protocol

Paul Rogers edited this page Dec 4, 2016 · 6 revisions

Runtime Operator Protocol

Drill's execution model consists of a large variety of runtime operators assembled by the planner to implement a specific SQL query. Operators use the Volcano model: each is an iterator that returns records from a call to a next() method. The devil, as they say, is in the details. We explore those details here.

Each operator implements some relational operator: scan, sort, join, broadcast, etc. The Volcano iterator approach provides a unified iterator model implemented by all operators. No matter whether the operator reads data, sorts, or performs aggregations, it implements the same basic iterator model. Here we focus on that iterator model, not on the unique behavior of each operator.

Operator Creation

Each operator has a Creator class. For example, FilterRecordBatch has a FilterBatchCreator. The creator builds the operator. The operator constructor performs one-time initial setup. The key fact to understand is that the constructor does not have visibility to the schema of the data, hence the constructor cannot do setup that requires this information. Most operators therefore have some internal state (sometimes explicit as an enum, sometimes implicit via some variable) to know that they are in the "have not yet seen a schema" state.

Operator Protocol

As [Runtime Model|described elsewhere], operators are implemented in a bit of a non-intuitive way. The term "operator" in Drill is what we might call an "operator definition": the information that describes the operator. The actual runtime operator is called a "record batch" in Drill. All such runtime operators derive from RecordBatch. The basic (highly simplified) protocol is:

public interface RecordBatch {
  public static enum IterOutcome { NONE, OK_NEW_SCHEMA, OK, STOP };

  public IterOutcome next();
  public BatchSchema getSchema();
  public VectorContainer getOutgoingContainer();
  public void close();
}

The actual code is somewhat more complex, but contains thorough comments that you should read for the details.

The next() Method

The heart of the protocol is the next() method. The theory in Volcano is simple: each call returns a record until all records are read. In Drill, the operation is a bit more complex because operators return batches of records (as value vectors), not individual records. Drill also allows the schema to vary as the query runs, and handles error cases. This results in the protocol explained here.

First, note that next() returns a variety of exit codes:

  • OK_NEW_SCHEMA: Returned a schema (and optionally a record batch). Returned (ideally) each time the schema from this call to next() differs from that of previous calls.
  • OK: Returned a record batch (which always includes a schema). The schema is the same as that from the previous call.
  • DONE: No data returned, end of data. Equivalent to an EOF from a file reader.
  • STOP: Error condition: stop processing.

Running the Operator Tree

The FragmentExecutor runs the fragment which consists of a tree of operators, one of which is the root. The fragment executor calls next() on the root fragment to start execution.

          while (shouldContinue() && root.next()) {
            // loop
          }

Special First Batch Handling

In general, the protocol is that the downstream operator calls next() on the upstream operator. The implementation adds an additional layer defined in AbstractRecordBatch that performs some standard processing in next() before calling innerNext() that does work unique to each operator. The very first batch is handled specially, however. On the initial call to next(), AbstractRecordBatch calls a "fast path" to get the schema: buildSchema(), which calls next() on the upstream operator. By the type that AbstractRecordBatch calls innerNext(), a batch has already been fetched. Note that this behavior is different than all subsequent rows in which innerNext() is responsible for fetching any needed batch.

next(): Coming and Going

When discussing the behavior of next(), we have to consider two views:

  • The "consumer" the bit of code (usually an operator) that calls next() and handles the results.
  • The "producer" that implements the next() method.

We discuss both views below.

First Batch

The next() call propagates down the tree (the order is highly dependent on the particular type of operator). For any given operator, it will eventually see a first call to next().

At this point the operator does not know the data schema. Therefore, the operator must call next() on its own input in order to get the first batch. (That call may, in turn, cascade down the operator tree until it reaches a leaf: a scanner or a network receiver.) Once we have a schema, the operator can complete initialization:

  • Call next() on the input to get a first batch.
  • Initialize the present operator based on the returned schema.
  • Process the record batch.

That is, the first next() both initializes and processes records the same way that subsequent next() calls will.

The operator now must consider what to do based on the return value from it's input next(). For example:

  • OK: Indicates that the child (input) operator returned a batch of records (along with a schema.) Since this is the first batch, the present operator must usually do some form of setup which often involves generating code based on the schema.
  • OK_NEW_SCHEMA: In theory, the input should return the OK_NEW_SCHEMA status each time the schema changes, including the first time. In practice, the first batch seems to be returned (for some operators) as simply OK. Operators contain code to handle this ambiguity.
  • DONE: It could be that the query has no data at all: as scanner read an empty file, a filter removed all records, etc. In this case, the very first call to the input next() can return DONE, indicating that no data is available.
  • STOP: Indicates that an error occurred and that the operator should stop processing and exit.

Each operator processes the first batch differently. A filter will process the one batch; a sort will read all its incoming batches before returning from the first next(). In general, the return values are the above, but seen from the consumer's perspective:

  • OK_NEW_SCHEMA: Should be returned from the first next() call for successful results. Note that the actual results may be empty if all rows in the batch were filtered away.
  • DONE: No data from the query. Either no data was received from input, or this operator discarded all the data.
  • STOP: An error occurred.

Subsequent next() Calls

Consumers handle subsequent calls to next() work similarly to the first. The first call returned a schema and caused stop to occur. Since Drill has late schema binding, schemas may change. Thus any call to next() may return a new schema, requiring new initialization (assuming that the consumer can handle schema changes.) Expected return codes are thus:

  • OK: Indicates another batch with the same schema as the previous one.
  • OK_NEW_SCHEMA: Indicates a schema change (with optional data).
  • DONE: End of data.
  • STOP: Error condition.

Again, the operator must return a status as a producer using the same codes as above:

  • OK: Indicates another batch with the same schema as the previous one.
  • OK_NEW_SCHEMA: Indicates this operator encountered a schema change (with optional data).
  • DONE: End of data.
  • STOP: Error condition.

End of Data

Every query must end at some time. When a scanner finds it has no more records to read, it returns DONE from the (last) call to next(). DONE never includes data; it instead indicates EOF.

Each operator includes some internal state that must be shut down: release (direct memory) buffers, close files, etc. This is, in general, not done in response to end-of-data, but is done later in the close() call.

The protocol should dictate that once a producer returns DONE, its consumer must never again call next(). At present, this rule is vague: an operator must be prepared for further calls to next(). Such spurious calls must continue to return DONE.

Normal Close

Once the root operator returns DONE in the loop shown earlier, the fragment executor starts the process of operator shutdown. The fragment executor (actually, a helper class) loops over operators and invokes close() on each. This occurs from root to leaf order, down all branches of the operator tree. close() must release all resources, especially direct memory buffers, open files, open connections, etc. While this seems simple, actual implementation can be quite complex in some operators.

Some general rules:

  • close() should invoke no methods on either its parent or child operators. (However, some operators violate this rule.)
  • close() should allocate no new direct memory (since it may be called in an out-of-memory condition.)

A complexity is that the record iterator operator does, in fact, call next() on its input operator in an attempt to clear incoming batches. This appears to be more of a "bug" than a feature.

Similarly, the merge join directly calls close() on its child operators. Again, this seems to be an attempt to fix a specific bug rather than by design.

Error Handling

Error handling in Drill operators is a complex topic explained on its own page.

Clone this wiki locally