Skip to content

Commit

Permalink
Add workers (#62)
Browse files Browse the repository at this point in the history
* Add workers

* Add to changelog
  • Loading branch information
Koeng101 authored Feb 13, 2024
1 parent be76fc3 commit f4998ef
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Added workers to bio as a way to process data [#62](https://github.com/Koeng101/dnadesign/pull/62)
- Improved megamash efficiency and added []Match JSON conversion [#61](https://github.com/Koeng101/dnadesign/pull/61)
- Added barcoding functionality for sequencing reads [#59](https://github.com/Koeng101/dnadesign/pull/59)
- Added the megamash algorithm [#50](https://github.com/Koeng101/dnadesign/pull/50)
Expand Down
37 changes: 31 additions & 6 deletions lib/bio/bio.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,25 +302,50 @@ func ManyToChannel[Data DataTypes, Header HeaderTypes](ctx context.Context, chan
return err
}

// WorkerFunc defines the type of function that will be run by each worker.
// It should take a context as its argument for cancellation and coordination.
type WorkerFunc func(ctx context.Context) error

// RunWorkers starts a specified number of workers, each executing the provided WorkerFunc.
// It uses an errgroup.Group to manage the workers and handle errors.
func RunWorkers[Data DataTypes](ctx context.Context, numWorkers int, output chan<- Data, work WorkerFunc) error {
g, ctx := errgroup.WithContext(ctx)

for i := 0; i < numWorkers; i++ {
g.Go(func() error {
return work(ctx)
})
}

// Start a separate goroutine to wait for all workers to finish and close the output channel.
go func() {
_ = g.Wait()
close(output)
}()

// Return immediately, allowing the caller to continue. Note that error handling from workers is asynchronous.
return nil
}

// FilterData is a generic function that implements a channel filter. Users
// give an input and output channel, with a filtering function, and FilterData
// filters data from the input into the output.
func FilterData[Data DataTypes](ctx context.Context, input <-chan Data, output chan<- Data, filter func(a Data) bool) error {
for {
select {
case <-ctx.Done():
close(output)
return ctx.Err()

case data, ok := <-input:
if !ok {
// If the input channel is closed, we close the output channel and return
close(output)
return nil // returning nil as the input channel being closed is a normal completion signal
return nil
}
if filter(data) {
// Only send data through if it passes the filter.
output <- data
select {
case output <- data:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions lib/bio/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ func ExampleFilterData() {
ctx := context.Background()
errorGroup, ctx := errgroup.WithContext(ctx)
errorGroup.Go(func() error {
return bio.FilterData(ctx, inputChan, outputChan, func(data sam.Alignment) bool { return (data.FLAG & 0x900) == 0 })
return bio.RunWorkers(ctx, 1, outputChan, func(ctx context.Context) error {
return bio.FilterData(ctx, inputChan, outputChan, func(data sam.Alignment) bool { return (data.FLAG & 0x900) == 0 })
})
})

// Send some example Alignments to the input channel
Expand Down Expand Up @@ -466,7 +468,11 @@ $%&$$$$$#')+)+,<>@B?>==<>>;;<<<B??>?@DA@?=>==>??<>??7;<706=>=>CBCCB????@CCBDAGFF
// Filter the right barcode fastqs from channel
barcode := "barcode07"
errorGroup.Go(func() error {
return bio.FilterData(ctx, fastqReads, fastqBarcoded, func(data fastq.Read) bool { return data.Optionals["barcode"] == barcode })
// We're going to start multiple workers within this errorGroup. This
// helps when doing computationally intensive operations on channels.
return bio.RunWorkers(ctx, 2, fastqBarcoded, func(ctx context.Context) error {
return bio.FilterData(ctx, fastqReads, fastqBarcoded, func(data fastq.Read) bool { return data.Optionals["barcode"] == barcode })
})
})

// Now, check the outputs. We should have sorted only for barcode07
Expand Down

0 comments on commit f4998ef

Please sign in to comment.