Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 79 additions & 39 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,42 @@
* @param <T> the type of values being encoded and decoded
*/
public abstract class Coder<T> implements Serializable {

/**
* The context in which encoding or decoding is being done.
* This context in which encoding or decoding is performed. @Deprecated The context in which
* encoding or decoding is performed.
*
* <p>There are two types of contexts:
*
* <ul>
* <li><b>OUTER</b>: The value occupies the entire stream. No additional length information is
* needed because the value consumes all remaining bytes.
* <li><b>NESTED</b>: The value is part of a larger structure (e.g., inside a list or record).
* In this case, length or boundary information may be required to correctly decode it.
* </ul>
*
* <p>This distinction ensures correct decoding of values from the stream.
*
* @deprecated To implement a coder, do not use any {@link Context}. Just implement only those
* abstract methods which do not accept a {@link Context} and leave the default
* implementations for methods accepting a {@link Context}.
* <p>Example usage (simplified):
*
* <pre>{@code
* // Assuming 'coder' is an instance of Coder<String>
*
* // Encoding a standalone value
* coder.encode("hello", outStream, Context.OUTER);
*
* // Encoding values inside a collection
* for (String s : list) {
* coder.encode(s, outStream, Context.NESTED);
* }
* }</pre>
*/
@Deprecated
public static class Context {
/**
* The outer context: the value being encoded or decoded takes up the remainder of the
* record/stream contents.
*/

public static final Context OUTER = new Context(true);

/**
* The nested context: the value being encoded or decoded is (potentially) a part of a larger
* record/stream contents, and may have other parts encoded or decoded after it.
*/
public static final Context NESTED = new Context(false);

/**
* Whether the encoded or decoded value fills the remainder of the output or input (resp.)
* record/stream contents. If so, then the size of the decoded value can be determined from the
* remaining size of the record/stream contents, and so explicit lengths aren't required.
*/
public final boolean isWholeStream;

public Context(boolean isWholeStream) {
Expand Down Expand Up @@ -112,22 +122,51 @@ public String toString() {
}

/**
* Encodes the given value of type {@code T} onto the given output stream. Multiple elements can
* be encoded next to each other on the output stream, each coder should encode information to
* know how many bytes to read when decoding. A common approach is to prefix the encoding with the
* element's encoded length.
* Encodes the given value of type {@code T} into the provided {@code OutputStream}.
*
* <p>This method writes the encoded representation of the value to the provided stream. The exact
* encoding format depends on the specific {@code Coder} implementation.
*
* @throws IOException if writing to the {@code OutputStream} fails for some reason
* @throws CoderException if the value could not be encoded for some reason
* <p>When encoding values, it is important to consider how they will be decoded later. In cases
* where multiple values are written to the same stream, additional information (such as
* boundaries or lengths) may be required to ensure correct decoding.
*
* <p>See {@link Context} for details on how encoding behavior may differ depending depending on
* whether the value is written as part of a larger structure.
*
* @throws IOException if writing to the {@code OutputStream} fails
* @throws CoderException if the value cannot be encoded
*/
public abstract void encode(T value, OutputStream outStream) throws CoderException, IOException;

/**
* Encodes the given value of type {@code T} onto the given output stream in the given context.
* Encodes the given value of type {@code T} into the provided {@code OutputStream} using the
* specified {@link Context}.
*
* <p>The {@code Context} determines how the value is encoded:
*
* <ul>
* <li><b>OUTER</b>: The value is encoded as a complete stream without needing additional
* boundary information during decoding.
* <li><b>NESTED</b>: The value is encoded as part of a larger structure. In this case,
* additional metadata (such as length or delimiters) may be required to allow correct
* decoding.
* </ul>
*
* @throws IOException if writing to the {@code OutputStream} fails for some reason
* @throws CoderException if the value could not be encoded for some reason
* @deprecated only implement and call {@link #encode(Object value, OutputStream)}
* <p>Example usage:
*
* <pre>{@code
* // Standalone encoding
* coder.encode("hello", outStream, Context.OUTER);
*
* // Encoding a standalone value
* for (String s : list) {
* coder.encode(s, outStream, Context.NESTED);
* }
* }</pre>
*
* @deprecated Prefer using {@link #encode(Object, OutputStream)} and avoid relying on {@link
* Context}
*/
@Deprecated
public void encode(T value, OutputStream outStream, Context context)
Expand All @@ -136,23 +175,24 @@ public void encode(T value, OutputStream outStream, Context context)
}

/**
* Decodes a value of type {@code T} from the given input stream in the given context. Returns the
* decoded value. Multiple elements can be encoded next to each other on the input stream, each
* coder should encode information to know how many bytes to read when decoding. A common approach
* is to prefix the encoding with the element's encoded length.
* Decodes a value of type {@code T} from the given {@code InputStream}.
*
* <p>This method reads the encoded representation of a value from the input stream and
* reconstructs the original object.
*
* <p>When multiple values are encoded into the same stream, the implementation must ensure that
* enough information is available to determine how many bytes belong to each value (for example,
* by prefixing the encoded data with its length).
*
* @throws IOException if reading from the {@code InputStream} fails for some reason
* @throws CoderException if the value could not be decoded for some reason
* @throws IOException if reading from the {@code InputStream} fails
* @throws CoderException if the value cannot be decoded
*/
public abstract T decode(InputStream inStream) throws CoderException, IOException;

/**
* Decodes a value of type {@code T} from the given input stream in the given context. Returns the
* decoded value.
* Decodes a value of type {@code T} from the given input stream in the specified context.
*
* @throws IOException if reading from the {@code InputStream} fails for some reason
* @throws CoderException if the value could not be decoded for some reason
* @deprecated only implement and call {@link #decode(InputStream)}
* @deprecated Prefer using {@link #decode(InputStream)} and avoid relying on {@link Context}.
*/
@Deprecated
public T decode(InputStream inStream, Context context) throws CoderException, IOException {
Expand Down
27 changes: 27 additions & 0 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4229,6 +4229,15 @@ output_pc = input_pc | beam.Select(post_code=lambda item: str(item["shipping_add
Support for wildcards hasn't been developed for the Python SDK yet.
{{< /paragraph >}}

<< paragraph class="language-py" >>
Example of selecting all nested fields using wildcard:
<< /paragraph >>

<< highlight py >>
input_pc = ...
output_pc = input_pc | beam.Select("shippingAddress.*")
<< /highlight >>

{{< paragraph class="language-go">}}
Support for wildcards hasn't been developed for the Go SDK yet.
{{< /paragraph >}}
Expand Down Expand Up @@ -4259,6 +4268,15 @@ selected, the result is an array of the selected subfield type. For example
Support for Array fields hasn't been developed for the Python SDK yet.
{{< /paragraph >}}

{{< paragraph class="language-py" >}}
example of selecting a nested field inside an array:
{{< /paragraph >}}

<< highlight py >>
input_pc = ...
output_pc = input_pc | beam.Select("transactions[].bank")
<< /highlight py >>

{{< paragraph class="language-go">}}
Support for Array fields hasn't been developed for the Go SDK yet.
{{< /paragraph >}}
Expand Down Expand Up @@ -4287,6 +4305,15 @@ arrays, the use of {} curly brackets in the selector is recommended, to make it
selected, they can be omitted for brevity. In the future, map key selectors will be supported, allowing selection of
specific keys from the map. For example, given the following schema:

<< paragraph class="language-py" >>
Example of selecting a specific key from a map field:
<< /paragraph >>

<< highlight py >>
input_pc = ...
output_pc = input_pc | beam.Select("purchasesByType['electronics']")
<< /highlight >>

**PurchasesByType**

<table class="table-wrapper--pr">
Expand Down
Loading