Skip to content

IndustryFusion/FlinkJobRunner

Repository files navigation

FlinkJobRunner

πŸš€ Overview

FlinkJobRunner is a containerized Flask-based job orchestration service that automates the deployment and management of Apache Flink streaming jobs with SHACL (Shapes Constraint Language) validation. It provides a RESTful API for running complex data processing workflows that combine semantic data validation, knowledge graph processing, and distributed stream processing.

πŸ—οΈ Architecture

The application acts as a job runner that:

  1. Receives semantic data files (RDF knowledge graphs and SHACL constraint files) via pre-signed URLs
  2. Orchestrates multi-step workflows using Make targets for setup, validation, and deployment
  3. Manages Kubernetes resources for Flink cluster deployment and database operations
  4. Provides real-time logging through Server-Sent Events (SSE) for job monitoring
  5. Handles PostgreSQL database operations for storing processed semantic data

πŸ”§ Key Features

  • 🌐 RESTful API: Create, monitor, and manage jobs via HTTP endpoints
  • πŸ“Š Real-time Monitoring: Server-Sent Events (SSE) for live job log streaming
  • πŸ”„ Sequential Workflows: Support for complex multi-step operations (setup-and-deploy)
  • ☸️ Kubernetes Native: Designed for in-cluster execution with proper RBAC
  • πŸ—„οΈ Database Integration: PostgreSQL operations with secure credential management
  • πŸ›‘οΈ Security: Runs as non-root user with minimal required permissions
  • πŸ“¦ Containerized: Docker-ready with all required tools pre-installed

πŸ› οΈ Supported Tools & Technologies

The container includes all necessary tools for the complete workflow:

  • Apache Flink: Stream processing engine deployment
  • Kubernetes: Cluster management (kubectl, helm, helmfile)
  • PostgreSQL: Database client (psql) for data operations
  • SHACL Processing: Python libraries (rdflib, pyshacl) for semantic validation
  • YAML Processing: yq v3.4.1 for configuration management
  • Build Tools: Make, Python 3.12, and development dependencies

πŸ“‹ Prerequisites

  • Docker for building the container image
  • Kubernetes cluster with:
    • PostgreSQL database (acid-cluster or similar)
    • Strimzi Kafka operator
    • Flink operator
    • Appropriate RBAC permissions

πŸš€ Quick Start

1. Build the Container

# Simple build
./build.sh

# Build with custom name and tag
IMAGE_NAME=my-flink-runner IMAGE_TAG=v1.0.0 ./build.sh

# Build and push to registry
REGISTRY=your-registry.com ./build.sh

2. Run Locally (Development)

# With kubeconfig file
docker run -p 8080:8080 \
  -v ~/.kube/config:/app/secrets/kubeconfig \
  flink-job-runner

# Check health
curl http://localhost:8080/healthz

3. Deploy to Kubernetes

# Update the image name in k8s-deployment.yaml, then:
kubectl apply -f k8s-deployment.yaml

πŸ“‘ API Usage

Create a Job

POST /jobs
Content-Type: application/json

{
  "jobId": "optional-custom-id",
  "target": "setup-and-deploy",
  "urls": {
    "knowledge": "https://your-bucket/knowledge.ttl",
    "shacl": "https://your-bucket/shacl.ttl"
  },
  "context": {
    "kubeContext": "optional-context"
  }
}

Monitor Job Progress

# Get job status
GET /jobs/{jobId}

# Stream live logs (Server-Sent Events)
GET /jobs/{jobId}/logs

Cancel Running Job

POST /jobs/{jobId}/cancel

🎯 Supported Targets

  • setup: Install dependencies and prepare environment
  • flink-deploy: Deploy Flink cluster and processing jobs
  • setup-and-deploy: Sequential execution of setup + flink-deploy
  • validate: Run SHACL validation only
  • plan: Generate deployment plan without execution

πŸ”’ Security & RBAC

The application requires the following Kubernetes permissions:

  • Pods/Services: CRUD operations for Flink components
  • Deployments: Scaling and management of Strimzi operators
  • Secrets: Access to database credentials
  • ConfigMaps: Configuration management
  • KafkaTopics: Strimzi resource management

See k8s-deployment.yaml for complete RBAC configuration.

πŸ“Š Monitoring & Logging

Health Check

curl http://localhost:8080/healthz

Real-time Logs

// JavaScript example for SSE log streaming
const eventSource = new EventSource('/jobs/your-job-id/logs');
eventSource.onmessage = function(event) {
  console.log('Log:', event.data);
};

Job Status

curl http://localhost:8080/jobs/your-job-id

🐳 Container Configuration

Environment Variables

Variable Default Description
RUNNER_BIND 0.0.0.0 Flask server bind address
RUNNER_PORT 8080 Flask server port
DIGITALTWIN_ROOT /app/work/shacl2flink Path to shacl2flink tools
WORK_ROOT /app/work Working directory for jobs
KUBECONFIG `` Kubeconfig path (empty for in-cluster)
ALLOWED_TARGETS setup,flink-deploy,... Comma-separated allowed targets

Volume Mounts

  • /app/work - Persistent storage for job data (recommended: 10Gi)
  • /var/run/secrets/kubernetes.io/serviceaccount - Service account token (auto-mounted)

πŸ”§ Development

Local Development Setup

# Create virtual environment
python3 -m venv .venv
source .venv/bin/activate

# Install dependencies  
pip install -r requirements.txt
pip install -r work/shacl2flink/requirements.txt
pip install -r work/shacl2flink/requirements-dev.txt

# Run locally
python3 app.py

Project Structure

FlinkJobRunner/
β”œβ”€β”€ app.py                    # Main Flask application
β”œβ”€β”€ alerts_shacl.py          # SHACL alerts blueprint
β”œβ”€β”€ requirements.txt         # Root Python dependencies
β”œβ”€β”€ Dockerfile              # Container definition
β”œβ”€β”€ build.sh               # Build script
β”œβ”€β”€ k8s-deployment.yaml    # Kubernetes deployment
β”œβ”€β”€ work/
β”‚   β”œβ”€β”€ shacl2flink/       # Core processing tools
β”‚   β”‚   β”œβ”€β”€ Makefile       # Build and deployment targets
β”‚   β”‚   β”œβ”€β”€ requirements.txt
β”‚   β”‚   └── requirements-dev.txt
β”‚   └── helm/              # Helm charts and configurations
└── secrets/
    └── kubeconfig         # Local development kubeconfig

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Test locally and in Kubernetes
  5. Submit a pull request

πŸ“„ License

See LICENSE file for details.


Built for Industry 4.0 semantic data processing workflows 🏭✨