@@ -12,6 +12,9 @@ import (
12
12
"time"
13
13
14
14
v1 "k8s.io/api/batch/v1"
15
+ corev1 "k8s.io/api/core/v1"
16
+ k8errors "k8s.io/apimachinery/pkg/api/errors"
17
+ "k8s.io/apimachinery/pkg/api/resource"
15
18
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
19
"k8s.io/client-go/kubernetes"
17
20
"k8s.io/client-go/kubernetes/scheme"
@@ -71,6 +74,7 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly
71
74
event : writer ,
72
75
database : reader ,
73
76
log : log ,
77
+ config : kubeconfig ,
74
78
}
75
79
76
80
if ! conf .DisableReconciler {
@@ -90,6 +94,7 @@ type Backend struct {
90
94
database tes.ReadOnlyServer
91
95
log * logger.Logger
92
96
backendParameters map [string ]string
97
+ config * rest.Config
93
98
events.Computer
94
99
}
95
100
@@ -128,7 +133,8 @@ func (b *Backend) Close() {
128
133
//TODO: close database?
129
134
}
130
135
131
- // createJob uses the configured template to create a kubernetes batch job.
136
+ // Create the Funnel Worker job
137
+ // Executor job is created in worker/kubernetes.go#Run
132
138
func (b * Backend ) createJob (task * tes.Task ) (* v1.Job , error ) {
133
139
submitTpl , err := template .New (task .Id ).Parse (b .template )
134
140
if err != nil {
@@ -165,19 +171,92 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
165
171
return job , nil
166
172
}
167
173
168
- // Submit submits a task to the server as a kubernetes v1/batch job.
174
+ func (b * Backend ) createPVC (ctx context.Context , taskID string , resources * tes.Resources ) error {
175
+ clientset , err := kubernetes .NewForConfig (b .config ) // You'll need to store the config during NewBackend
176
+ if err != nil {
177
+ return fmt .Errorf ("getting kubernetes client: %v" , err )
178
+ }
179
+
180
+ storageSize := resource .NewQuantity (1024 * 1024 * 1024 , resource .BinarySI ) // 1Gi default
181
+ if resources != nil && resources .DiskGb > 0 {
182
+ storageSize = resource .NewQuantity (int64 (resources .DiskGb * 1024 * 1024 * 1024 ), resource .BinarySI )
183
+ }
184
+
185
+ pvc := & corev1.PersistentVolumeClaim {
186
+ ObjectMeta : metav1.ObjectMeta {
187
+ Name : fmt .Sprintf ("funnel-pvc-%s" , taskID ),
188
+ Labels : map [string ]string {
189
+ "app" : "funnel" ,
190
+ "taskId" : taskID ,
191
+ },
192
+ },
193
+ Spec : corev1.PersistentVolumeClaimSpec {
194
+ AccessModes : []corev1.PersistentVolumeAccessMode {
195
+ corev1 .ReadWriteOnce ,
196
+ },
197
+ Resources : corev1.VolumeResourceRequirements {
198
+ Requests : corev1.ResourceList {
199
+ corev1 .ResourceStorage : * storageSize ,
200
+ },
201
+ },
202
+ },
203
+ }
204
+
205
+ _ , err = clientset .CoreV1 ().PersistentVolumeClaims (b .namespace ).Create (ctx , pvc , metav1.CreateOptions {})
206
+ if err != nil {
207
+ return fmt .Errorf ("creating shared PVC: %v" , err )
208
+ }
209
+
210
+ return nil
211
+ }
212
+
213
+ // Add this helper function for PVC cleanup
214
+ func (b * Backend ) deletePVC (ctx context.Context , taskID string ) error {
215
+ clientset , err := kubernetes .NewForConfig (b .config )
216
+ if err != nil {
217
+ return fmt .Errorf ("getting kubernetes client: %v" , err )
218
+ }
219
+
220
+ pvcName := fmt .Sprintf ("funnel-pvc-%s" , taskID )
221
+ err = clientset .CoreV1 ().PersistentVolumeClaims (b .namespace ).Delete (ctx , pvcName , metav1.DeleteOptions {})
222
+ if err != nil {
223
+ // If the PVC is already gone, ignore the error
224
+ if k8errors .IsNotFound (err ) {
225
+ return nil
226
+ }
227
+ return fmt .Errorf ("deleting shared PVC: %v" , err )
228
+ }
229
+
230
+ return nil
231
+ }
232
+
233
+ // Submit creates both the PVC and the worker job with better error handling
169
234
func (b * Backend ) Submit (ctx context.Context , task * tes.Task ) error {
235
+ // Create a new background context instead of inheriting from the potentially canceled one
236
+ submitCtx := context .Background ()
237
+
238
+ // TODO: Update this so that a PVC is only created if the task has inputs or outputs
239
+ // If the task has either inputs or outputs, then create a PVC
240
+ // shared between the Funnel Worker and the Executor
241
+ // e.g. `if len(task.Inputs) > 0 || len(task.Outputs) > 0 {}`
242
+ err := b .createPVC (submitCtx , task .Id , task .GetResources ())
243
+ if err != nil {
244
+ return fmt .Errorf ("creating shared storage: %v" , err )
245
+ }
246
+
247
+ // Create the worker job
170
248
job , err := b .createJob (task )
171
249
if err != nil {
172
250
return fmt .Errorf ("creating job spec: %v" , err )
173
251
}
174
- ctx = context . Background ()
175
- job , err = b .client .Create (ctx , job , metav1.CreateOptions {
252
+
253
+ _ , err = b .client .Create (submitCtx , job , metav1.CreateOptions {
176
254
FieldManager : task .Id ,
177
255
})
178
256
if err != nil {
179
257
return fmt .Errorf ("creating job in backend: %v" , err )
180
258
}
259
+
181
260
return nil
182
261
}
183
262
@@ -192,6 +271,12 @@ func (b *Backend) deleteJob(ctx context.Context, taskID string) error {
192
271
if err != nil {
193
272
return fmt .Errorf ("deleting job: %v" , err )
194
273
}
274
+
275
+ // Delete Worker PVC
276
+ if err := b .deletePVC (ctx , taskID ); err != nil {
277
+ b .log .Error ("failed to delete PVC" , "taskID" , taskID , "error" , err )
278
+ }
279
+
195
280
return nil
196
281
}
197
282
@@ -250,6 +335,12 @@ ReconcileLoop:
250
335
continue ReconcileLoop
251
336
}
252
337
b .log .Debug ("reconcile: cleanuping up successful job" , "taskID" , j .Name )
338
+
339
+ // Delete Worker PVC
340
+ if err := b .deletePVC (ctx , j .Name ); err != nil {
341
+ b .log .Error ("failed to delete PVC" , "taskID" , j .Name , "error" , err )
342
+ }
343
+
253
344
err := b .deleteJob (ctx , j .Name )
254
345
if err != nil {
255
346
b .log .Error ("reconcile: cleaning up successful job" , "taskID" , j .Name , "error" , err )
@@ -273,6 +364,12 @@ ReconcileLoop:
273
364
if disableCleanup {
274
365
continue ReconcileLoop
275
366
}
367
+
368
+ // Delete Worker PVC
369
+ if err := b .deletePVC (ctx , j .Name ); err != nil {
370
+ b .log .Error ("reconcile: cleaning up PVC for failed job" , "taskID" , j .Name , "error" , err )
371
+ }
372
+
276
373
err = b .deleteJob (ctx , j .Name )
277
374
if err != nil {
278
375
b .log .Error ("reconcile: cleaning up failed job" , "taskID" , j .Name , "error" , err )
0 commit comments