FlinkJobRunner is a containerized Flask-based job orchestration service that automates the deployment and management of Apache Flink streaming jobs with SHACL (Shapes Constraint Language) validation. It provides a RESTful API for running complex data processing workflows that combine semantic data validation, knowledge graph processing, and distributed stream processing.
The application acts as a job runner that:
- Receives semantic data files (RDF knowledge graphs and SHACL constraint files) via pre-signed URLs
- Orchestrates multi-step workflows using Make targets for setup, validation, and deployment
- Manages Kubernetes resources for Flink cluster deployment and database operations
- Provides real-time logging through Server-Sent Events (SSE) for job monitoring
- Handles PostgreSQL database operations for storing processed semantic data
- π RESTful API: Create, monitor, and manage jobs via HTTP endpoints
- π Real-time Monitoring: Server-Sent Events (SSE) for live job log streaming
- π Sequential Workflows: Support for complex multi-step operations (
setup-and-deploy) - βΈοΈ Kubernetes Native: Designed for in-cluster execution with proper RBAC
- ποΈ Database Integration: PostgreSQL operations with secure credential management
- π‘οΈ Security: Runs as non-root user with minimal required permissions
- π¦ Containerized: Docker-ready with all required tools pre-installed
The container includes all necessary tools for the complete workflow:
- Apache Flink: Stream processing engine deployment
- Kubernetes: Cluster management (kubectl, helm, helmfile)
- PostgreSQL: Database client (psql) for data operations
- SHACL Processing: Python libraries (rdflib, pyshacl) for semantic validation
- YAML Processing: yq v3.4.1 for configuration management
- Build Tools: Make, Python 3.12, and development dependencies
- Docker for building the container image
- Kubernetes cluster with:
- PostgreSQL database (acid-cluster or similar)
- Strimzi Kafka operator
- Flink operator
- Appropriate RBAC permissions
# Simple build
./build.sh
# Build with custom name and tag
IMAGE_NAME=my-flink-runner IMAGE_TAG=v1.0.0 ./build.sh
# Build and push to registry
REGISTRY=your-registry.com ./build.sh# With kubeconfig file
docker run -p 8080:8080 \
-v ~/.kube/config:/app/secrets/kubeconfig \
flink-job-runner
# Check health
curl http://localhost:8080/healthz# Update the image name in k8s-deployment.yaml, then:
kubectl apply -f k8s-deployment.yamlPOST /jobs
Content-Type: application/json
{
"jobId": "optional-custom-id",
"target": "setup-and-deploy",
"urls": {
"knowledge": "https://your-bucket/knowledge.ttl",
"shacl": "https://your-bucket/shacl.ttl"
},
"context": {
"kubeContext": "optional-context"
}
}# Get job status
GET /jobs/{jobId}
# Stream live logs (Server-Sent Events)
GET /jobs/{jobId}/logsPOST /jobs/{jobId}/cancelsetup: Install dependencies and prepare environmentflink-deploy: Deploy Flink cluster and processing jobssetup-and-deploy: Sequential execution of setup + flink-deployvalidate: Run SHACL validation onlyplan: Generate deployment plan without execution
The application requires the following Kubernetes permissions:
- Pods/Services: CRUD operations for Flink components
- Deployments: Scaling and management of Strimzi operators
- Secrets: Access to database credentials
- ConfigMaps: Configuration management
- KafkaTopics: Strimzi resource management
See k8s-deployment.yaml for complete RBAC configuration.
curl http://localhost:8080/healthz// JavaScript example for SSE log streaming
const eventSource = new EventSource('/jobs/your-job-id/logs');
eventSource.onmessage = function(event) {
console.log('Log:', event.data);
};curl http://localhost:8080/jobs/your-job-id| Variable | Default | Description |
|---|---|---|
RUNNER_BIND |
0.0.0.0 |
Flask server bind address |
RUNNER_PORT |
8080 |
Flask server port |
DIGITALTWIN_ROOT |
/app/work/shacl2flink |
Path to shacl2flink tools |
WORK_ROOT |
/app/work |
Working directory for jobs |
KUBECONFIG |
`` | Kubeconfig path (empty for in-cluster) |
ALLOWED_TARGETS |
setup,flink-deploy,... |
Comma-separated allowed targets |
/app/work- Persistent storage for job data (recommended: 10Gi)/var/run/secrets/kubernetes.io/serviceaccount- Service account token (auto-mounted)
# Create virtual environment
python3 -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
pip install -r work/shacl2flink/requirements.txt
pip install -r work/shacl2flink/requirements-dev.txt
# Run locally
python3 app.pyFlinkJobRunner/
βββ app.py # Main Flask application
βββ alerts_shacl.py # SHACL alerts blueprint
βββ requirements.txt # Root Python dependencies
βββ Dockerfile # Container definition
βββ build.sh # Build script
βββ k8s-deployment.yaml # Kubernetes deployment
βββ work/
β βββ shacl2flink/ # Core processing tools
β β βββ Makefile # Build and deployment targets
β β βββ requirements.txt
β β βββ requirements-dev.txt
β βββ helm/ # Helm charts and configurations
βββ secrets/
βββ kubeconfig # Local development kubeconfig
- Fork the repository
- Create a feature branch
- Make your changes
- Test locally and in Kubernetes
- Submit a pull request
See LICENSE file for details.
Built for Industry 4.0 semantic data processing workflows πβ¨