A high-performance, thread-safe goroutine orchestration library for Go with zero-allocation execution, comprehensive error handling, and complex nested orchestration support.
- π Zero-allocation execution - Optimized for high-performance scenarios
- π Thread-safe - All operations use atomic primitives and proper synchronization
- π― Generic type support - Type-safe task execution with Go generics
- π‘οΈ Comprehensive error handling - Panic recovery, timeout handling, and detailed error reporting
- π Fluent API - Easy-to-use builder pattern for configuration
- π Rich observability - Status tracking, timing information, and stack traces
- ποΈ Modular architecture - Clean separation of concerns with internal packages
go get github.com/maniartech/orchestrator
package main
import (
"context"
"github.com/maniartech/orchestrator"
)
func main() {
// Simple task execution
result, err := orchestrator.Setup(
orchestrator.Task(func() (string, error) {
return "Hello, World!", nil
}).Named("greeting"),
).Await()
if err != nil {
panic(err)
}
greeting := result.Get("greeting").(string)
fmt.Println(greeting) // Output: Hello, World!
}
Here are the currently working examples with the Task execution engine:
import "github.com/maniartech/orchestrator"
func processData() (int, error) {
time.Sleep(10 * time.Millisecond)
return 42, nil
}
func main() {
result, err := orchestrator.Setup(
orchestrator.Task(processData).
Named("data-processor").
With(orchestrator.Config{
Timeout: 30 * time.Second,
}),
).Await()
if err != nil {
panic(err)
}
answer := result.Get("data-processor").(int)
fmt.Printf("Result: %d\n", answer)
}
func monitoredExecution() {
workflow := orchestrator.Setup(
orchestrator.Task(func() (string, error) {
time.Sleep(100 * time.Millisecond)
return "completed", nil
}).Named("slow-task"),
)
// Monitor status in real-time
go func() {
for {
status := workflow.GetStatus()
fmt.Printf("Status: %s\n", status)
if status == orchestrator.Completed || status == orchestrator.Cancelled {
break
}
time.Sleep(10 * time.Millisecond)
}
}()
result, err := workflow.Await()
// Handle result...
}
The following advanced orchestration capabilities are being implemented in the next development phases:
// π§ COMING SOON - Sequential and Concurrent orchestrations (Tasks 4.1 & 5.1)
// This will provide the same expressive power as the legacy code but with better performance
import "github.com/maniartech/orchestrator"
// HandleResource will process various activities on the specified resource.
// All activities will be executed in their own goroutines in an orchestrated manner.
// This will provide concurrent, faster yet controlled execution.
//
// |-----Task------------------| |-----Task----|
// | | | |
// ----Sequential----Concurrent----Sequential->>-Task->>-Task--|----Concurrent----Task----|----Await----
// | | | |
// | |-----Task----| | |-----Task----|
// | | | |
// |-----Concurrent----Task----|------|
// | |
// |-----Task----|
//
func HandleResource(resourceId int) error {
return orchestrator.Setup(
orchestrator.Sequential(
// Infrastructure preparation
orchestrator.Task(keepInfraReady).Named("infra-ready"),
// Concurrent resource processing
orchestrator.Concurrent(
// Main resource processing pipeline
orchestrator.Sequential(
orchestrator.Task(func() error { return fetchResource(resourceId) }).Named("fetch-resource"),
orchestrator.Task(processResource).Named("process-resource"),
orchestrator.Task(submitResource).Named("submit-resource"),
).Named("resource-pipeline"),
// Dependency preparation (concurrent)
orchestrator.Concurrent(
orchestrator.Task(prepareDependencyA).Named("prep-dep-a"),
orchestrator.Task(prepareDependencyB).Named("prep-dep-b"),
orchestrator.Task(prepareDependencyC).Named("prep-dep-c"),
).Named("dependency-prep"),
).Named("main-processing"),
// Final notifications (concurrent)
orchestrator.Concurrent(
orchestrator.Task(postToSocialMedia).Named("social-media"),
orchestrator.Task(sendNotifications).Named("notifications"),
orchestrator.Task(submitReport).Named("report"),
).Named("notifications"),
).Named("resource-handler"),
).Await()
}
Currently available with the Task execution engine:
import (
"time"
"github.com/maniartech/orchestrator"
)
func advancedTaskExample() error {
// Task with comprehensive configuration
result, err := orchestrator.Setup(
orchestrator.Task(func() (string, error) {
// Simulate complex processing
time.Sleep(50 * time.Millisecond)
return "processed", nil
}).Named("complex-processor").
With(orchestrator.Config{
Timeout: 30 * time.Second,
MaxConcurrency: 5,
}).
ErrorBoundary(orchestrator.CollectAll),
).With(orchestrator.Config{
Timeout: 60 * time.Second, // Workflow-level timeout
}).Await()
if err != nil {
return err
}
processed := result.Get("complex-processor").(string)
fmt.Printf("Result: %s\n", processed)
return nil
}
// π§ COMING SOON - Advanced error handling and configuration (Tasks 4.2 & 5.2)
func HandleResourceAdvanced(resourceId int) error {
return orchestrator.Setup(
orchestrator.Sequential(
// Infrastructure with custom timeout
orchestrator.Task(keepInfraReady).
Named("infra-ready").
With(orchestrator.Config{Timeout: 30 * time.Second}),
// Concurrent processing with different error strategies
orchestrator.Concurrent(
// Critical path - fail fast
orchestrator.Sequential(
orchestrator.Task(func() error { return fetchResource(resourceId) }).
Named("fetch-resource").
With(orchestrator.Config{
Timeout: 10 * time.Second,
Retries: 3,
}),
orchestrator.Task(processResource).
Named("process-resource").
ErrorBoundary(orchestrator.FailFast),
).Named("critical-path").
ErrorBoundary(orchestrator.FailFast),
// Non-critical dependencies - collect all errors
orchestrator.Concurrent(
orchestrator.Task(prepareDependencyA).Named("prep-dep-a"),
orchestrator.Task(prepareDependencyB).Named("prep-dep-b"),
orchestrator.Task(prepareDependencyC).Named("prep-dep-c"),
).Named("dependency-prep").
ErrorBoundary(orchestrator.CollectAll),
).Named("main-processing"),
).Named("resource-handler"),
).Await()
}
func monitorTaskExecution() error {
workflow := orchestrator.Setup(
orchestrator.Task(func() (string, error) {
// Simulate long-running task
time.Sleep(200 * time.Millisecond)
return "processing complete", nil
}).Named("long-running-task"),
)
// Monitor status in real-time
go func() {
for {
status := workflow.GetStatus()
name := workflow.GetName()
fmt.Printf("Task '%s' status: %s\n", name, status)
if status == orchestrator.Completed || status == orchestrator.Cancelled {
break
}
time.Sleep(50 * time.Millisecond)
}
}()
result, err := workflow.Await()
if err != nil {
return err
}
fmt.Printf("Final result: %s\n", result.Get("long-running-task"))
return nil
}
// Check task status
fmt.Printf("Status: %s\n", task.GetStatus()) // NotStarted, Running, Completed, or Cancelled
// Get task configuration
config := task.GetConfig()
fmt.Printf("Timeout: %s\n", config.Timeout)
The orchestrator is built with a modular architecture:
internal/
βββ task/ # Task execution engine with atomic status management
βββ orchestration/ # Core interfaces and orchestration types
βββ config/ # Configuration management with inheritance
βββ errors/ # Error handling and reporting
βββ result/ # Result management and type safety
βββ context/ # Context management for inter-task communication
βββ pool/ # Object pooling for performance optimization
βββ status/ # Status management utilities
- Zero-allocation status operations using atomic primitives
- Minimal memory overhead with object pooling
- Lock-free status management for maximum concurrency
- Efficient goroutine lifecycle management
All operations are thread-safe:
- Atomic status management using
sync/atomic
- Concurrent access to task properties
- Race-condition free execution
- Proper resource cleanup
Comprehensive error handling includes:
- Panic recovery with full stack traces
- Timeout handling with graceful termination
- Context cancellation support
- Rich error metadata with timing and operation IDs
The original orchestrator implementation has been moved to the legacy/
directory. The new implementation provides:
- Better thread safety (no race conditions)
- Improved performance with zero-allocation operations
- Cleaner architecture with proper separation of concerns
- Enhanced error handling and observability
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests with 100% coverage
- Ensure all tests pass with
go test -race
- Submit a pull request
This project is licensed under the MIT License - see the LICENSE file for details.
This library is under active development as part of a comprehensive redesign. The new implementation focuses on:
- Military-grade reliability and performance
- Zero-allocation execution paths
- Comprehensive testing and documentation
- Thread-safe operations throughout
See ROADMAP.md for development progress and future plans.