Automi is a Go package for processing streams of data. It enables composing powerful data pipelines by chaining operations that are applied to each element flowing through the stream. The project has been completely re-implemented to leverage Go's generics support, bringing significant advantages:
- Type safety across the entire pipeline: Compile-time type checking eliminates runtime type assertions and conversions
- Enhanced developer experience: Better IDE autocompletion and error detection during development
- Reduced boilerplate: No more manual type casting or interface{} conversions
- Improved performance: Elimination of runtime type checks and assertions leads to more efficient execution
With generics, you can now create strongly-typed streams where each operation's input and output types are verified by the compiler, catching errors early and making your stream processing code more robust and maintainable.
Automi implements a data processing pipeline using a streaming architecture. Data flows through a series of connected components that process each element as it passes through. This approach enables efficient handling of large datasets without loading everything into memory at once.
The Automi API is built around four key primitives:
- Source: The entry point that emits data elements into the stream. Sources can read from files, channels, slices, or any data provider.
- Stream: The central abstraction that coordinates the flow of data. It connects the source to operations and ultimately to a sink.
- Operations: Processing steps applied to each element in the stream. These include transformations (map), filters, aggregations, and windowing operations.
- Sink: The termination point that consumes the processed data. Sinks can write to files, collect results into slices, or forward data to other systems.
Data flows through this pipeline in a single direction: from source → through operations → to sink. Each operation receives data, processes it, and passes the result to the next stage.
Automi streams use Go channels internally to route data, providing built-in concurrency safety and automatic back-pressure handling. When a downstream component processes data more slowly, upstream components naturally slow down to match the pace, preventing memory overflow.
Now, let us explore some examples and see how easy it is to use Automi to process data streams.
See automi-eamples for a collection of all Automi examples.
As an introduction to Automi, let us explore a simple example that uses all primitives of the API to compose and express stream operations.
The following code snippet:
- Streams individual rune values from a slice source
- Applies a
Filter
operation to keep only uppercase ASCII letters - Maps each rune to its string representation
- Batches the results
- And finally sorts the result alphabetically
func main() {
// Define slice source
slice := sources.Slice([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ4x5Њ8yzUd90E12a3ᇳFGHmIザJuKLMᇙNO6PTnVWXѬYZbcef7ghijCklrAstvw`))
// creates stream from the source
strm := stream.From(slice)
// Define stream operations
strm.Run(
exec.Filter(func(_ context.Context, item rune) bool {
return item >= 65 && item < (65+26) // remove unwanted chars
}),
exec.Map(func(_ context.Context, item rune) string {
return string(item) // map rune to string
}),
// batch incoming string items into []string
window.Batch[string](),
// sort batched items
exec.SortSlice[[]string](),
)
// Send result to stdout
strm.Into(sinks.Writer[string](os.Stdout))
// open the stream
if err := <-strm.Open(context.Background()); err != nil {
fmt.Println(err)
return
}
}
See the complete example here.
- The first step defines a stream
Source
from a Go slice using functionsources.Slice
.
slice := sources.Slice([]rune(`B世!ぽ@opqDQRS#$%^&*()ᅖ4x5Њ8yzUd90E12a3ᇳFGHmIザJuKLMᇙNO6PTnVWXѬYZbcef7ghijCklrAstvw`))
- Next, the code creates a new Automi using the stream source. Each element of the slice source will be emitted individually to the stream.
strm := stream.From(slice)
- Next, we define
Filter
,Map
,Batch
, andSort
operations to be applied to each item as it streams.
// Define stream operations
strm.Run(
exec.Filter(func(_ context.Context, item rune) bool {
return item >= 65 && item < (65+26) // remove unwanted chars
}),
exec.Map(func(_ context.Context, item rune) string {
return string(item) // map rune to string
}),
// batch incoming string items into []string
window.Batch[string](),
// sort batched items
exec.SortSlice[[]string](),
)
- Next, we define a
Sink
at the end of the stream to collect the result in a Goio.Writer
which streams the as a string item into standard output:
strm.Into(sinks.Writer[string](os.Stdout))
- Lastly, the code opens the stream to start it:
if err := <-strm.Open(context.Background()); err != nil {
fmt.Println(err)
return
}
Let's explore another example that streams data from a CSV source file. Each CSV row will be
- Mapped to a Go struct type
- Filtered by value
- Then mapped to a slice of strings which is then collected into another CSV file.
type scientist struct {
FirstName string
LastName string
Title string
BornYear int
}
func main() {
// Source csv
src, _ := os.Open("./data.txt")
source := sources.CSV(src)
// Sink csv
snk, _ := os.Create("./result.txt")
sink := sinks.CSV(snk)
// Start new stream from source
stream := stream.From(source)
// setup execution operations
stream.Run(
// map csv row to struct scientist
exec.Map(func(ctx context.Context, cs []string) scientist {
yr, _ := strconv.Atoi(cs[3])
return scientist{
FirstName: cs[1],
LastName: cs[0],
Title: cs[2],
BornYear: yr,
}
}),
// apply data filter
exec.Filter(func(ctx context.Context, cs scientist) bool {
return (cs.BornYear > 1930)
}),
// remap value of type scientst to []string
exec.Map(func(ctx context.Context, cs scientist) []string {
return []string{cs.FirstName, cs.LastName, cs.Title}
}),
)
// stream result into sink
stream.Into(sink)
}
See the complete example here.
This example showcases the versatility of Automi by streaming and processing data from HTTP requests and responses. The example is an HTTP server program that streams data from the request Body, encodes it using base64, and streams the result into the HTTP response:
http.HandleFunc(
"/",
func(resp http.ResponseWriter, req *http.Request) {
resp.Header().Add("Content-Type", "text/html")
resp.WriteHeader(http.StatusOK)
// setup new stream with HTTP body as source
strm := stream.From(sources.Reader(req.Body))
strm.Run(
exec.Execute(func(_ context.Context, data []byte) string {
return base64.StdEncoding.EncodeToString(data)
}),
)
// route result into response
strm.Into(sinks.Writer[[]byte](resp))
// run the stream
if err := <-strm.Open(req.Context()); err != nil {
resp.WriteHeader(http.StatusInternalServerError)
slog.Error("Stream failed to open", "error", err)
}
},
)
See the complete example here.
Automi comes with a set of built-in components to get you started with stream processing including the followings.
sources.Chan
- source from Go channelssources.CSV
- source from CSVio.Reader
- source from io.Readerio.Scanner
- source from io.ScannerSlice
- source from Go slices
window.Batch
- batch allwindow.Size
- widow by sizewindows.Duration
window by duration
exec.Execute
- user-defined funcionexec.Filter
- filter funcexec.Map
- map func
exec.GroupByIndex
- goup by slice indexexec.GroupByStructField
- group by struct field nameexec.GroupByMapKey
- group by map key value
exec.SumByIndex
- sum by slice indexexec.SumByStructField
- sum by struct field nameexec.SumByMapKey
- sum by map key valueexec.Sum
- sum either 1D or 2D sliceexec.SumAll1D
- sum 1D sliceexec.SumAll2D
- sum 2D slice
exec.SortSlice
- sort sliceexec.SortSliceByIndex
- sort by slice indexexec.SortByStructField
- sort by struct field nameexec.SortByMapKey
- sort by map key valueexec.SortWithFunc
- sort with user-defined func
sinks.CSV
- sink into CSVsinks.Func
- sink into a user-defined funcsinks.Discard
- no op sinksinks.Slice
- sink into Go slicesinks.Slog
- sink into Go slog Loggersinks.Writer
- sink into Go io.Writer
The previous version (v0.1.0) of Automi, which uses Go reflection, has been moved
to branch v0.1.0-automi-reflection
and will not be maintained.
MIT