1
1
/*
2
- * Copyright 2021-2022 MONAI Consortium
2
+ * Copyright 2021-2023 MONAI Consortium
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
20
20
using Microsoft . Extensions . Logging ;
21
21
using Microsoft . Extensions . Options ;
22
22
using Minio ;
23
+ using Minio . Exceptions ;
23
24
using Monai . Deploy . Storage . API ;
24
25
using Monai . Deploy . Storage . Configuration ;
25
26
using Monai . Deploy . Storage . S3Policy ;
26
27
using Newtonsoft . Json ;
28
+ using ObjectNotFoundException = Minio . Exceptions . ObjectNotFoundException ;
27
29
28
30
namespace Monai . Deploy . Storage . MinIO
29
31
{
@@ -39,7 +41,7 @@ public class MinIoStorageService : IStorageService
39
41
public MinIoStorageService ( IMinIoClientFactory minioClientFactory , IAmazonSecurityTokenServiceClientFactory amazonSecurityTokenServiceClientFactory , IOptions < StorageServiceConfiguration > options , ILogger < MinIoStorageService > logger )
40
42
{
41
43
Guard . Against . Null ( options ) ;
42
- _minioClientFactory = minioClientFactory ?? throw new ArgumentNullException ( nameof ( IMinIoClientFactory ) ) ;
44
+ _minioClientFactory = minioClientFactory ?? throw new ArgumentNullException ( nameof ( minioClientFactory ) ) ;
43
45
_amazonSecurityTokenServiceClientFactory = amazonSecurityTokenServiceClientFactory ?? throw new ArgumentNullException ( nameof ( amazonSecurityTokenServiceClientFactory ) ) ;
44
46
_logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
45
47
@@ -101,13 +103,14 @@ public async Task<Dictionary<string, bool>> VerifyObjectsExistAsync(string bucke
101
103
Guard . Against . Null ( artifactList ) ;
102
104
103
105
var existingObjectsDict = new Dictionary < string , bool > ( ) ;
106
+ var exceptions = new List < Exception > ( ) ;
104
107
105
108
foreach ( var artifact in artifactList )
106
109
{
107
110
try
108
111
{
109
- var fileObjects = await ListObjectsAsync ( bucketName , artifact ) . ConfigureAwait ( false ) ;
110
- var folderObjects = await ListObjectsAsync ( bucketName , artifact . EndsWith ( "/" ) ? artifact : $ "{ artifact } /", true ) . ConfigureAwait ( false ) ;
112
+ var fileObjects = await ListObjectsAsync ( bucketName , artifact , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
113
+ var folderObjects = await ListObjectsAsync ( bucketName , artifact . EndsWith ( "/" ) ? artifact : $ "{ artifact } /", true , cancellationToken ) . ConfigureAwait ( false ) ;
111
114
112
115
if ( ! folderObjects . Any ( ) && ! fileObjects . Any ( ) )
113
116
{
@@ -122,10 +125,14 @@ public async Task<Dictionary<string, bool>> VerifyObjectsExistAsync(string bucke
122
125
{
123
126
_logger . VerifyObjectError ( bucketName , e ) ;
124
127
existingObjectsDict . Add ( artifact , false ) ;
128
+ exceptions . Add ( e ) ;
125
129
}
126
-
127
130
}
128
131
132
+ if ( exceptions . Any ( ) )
133
+ {
134
+ throw new VerifyObjectsException ( exceptions , existingObjectsDict ) ;
135
+ }
129
136
return existingObjectsDict ;
130
137
}
131
138
@@ -134,17 +141,25 @@ public async Task<bool> VerifyObjectExistsAsync(string bucketName, string artifa
134
141
Guard . Against . NullOrWhiteSpace ( bucketName ) ;
135
142
Guard . Against . NullOrWhiteSpace ( artifactName ) ;
136
143
137
- var fileObjects = await ListObjectsAsync ( bucketName , artifactName ) . ConfigureAwait ( false ) ;
138
- var folderObjects = await ListObjectsAsync ( bucketName , artifactName . EndsWith ( "/" ) ? artifactName : $ "{ artifactName } /", true ) . ConfigureAwait ( false ) ;
139
-
140
- if ( folderObjects . Any ( ) || fileObjects . Any ( ) )
144
+ try
141
145
{
142
- return true ;
143
- }
146
+ var fileObjects = await ListObjectsAsync ( bucketName , artifactName , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
147
+ var folderObjects = await ListObjectsAsync ( bucketName , artifactName . EndsWith ( "/" ) ? artifactName : $ "{ artifactName } /", true , cancellationToken ) . ConfigureAwait ( false ) ;
148
+
149
+ if ( folderObjects . Any ( ) || fileObjects . Any ( ) )
150
+ {
151
+ return true ;
152
+ }
144
153
145
- _logger . FileNotFoundError ( bucketName , $ "{ artifactName } ") ;
154
+ _logger . FileNotFoundError ( bucketName , $ "{ artifactName } ") ;
146
155
147
- return false ;
156
+ return false ;
157
+ }
158
+ catch ( Exception ex )
159
+ {
160
+ _logger . VerifyObjectError ( bucketName , ex ) ;
161
+ throw new VerifyObjectsException ( ex . Message , ex ) ;
162
+ }
148
163
}
149
164
150
165
public async Task PutObjectAsync ( string bucketName , string objectName , Stream data , long size , string contentType , Dictionary < string , string > ? metadata , CancellationToken cancellationToken = default )
@@ -295,36 +310,51 @@ public async Task CreateFolderWithCredentialsAsync(string bucketName, string fol
295
310
296
311
#region Internal Helper Methods
297
312
298
- private static async Task CopyObjectUsingClient ( IObjectOperations client , string sourceBucketName , string sourceObjectName , string destinationBucketName , string destinationObjectName , CancellationToken cancellationToken )
313
+ private async Task CopyObjectUsingClient ( IObjectOperations client , string sourceBucketName , string sourceObjectName , string destinationBucketName , string destinationObjectName , CancellationToken cancellationToken )
299
314
{
300
- var copySourceObjectArgs = new CopySourceObjectArgs ( )
301
- . WithBucket ( sourceBucketName )
302
- . WithObject ( sourceObjectName ) ;
303
- var copyObjectArgs = new CopyObjectArgs ( )
304
- . WithBucket ( destinationBucketName )
305
- . WithObject ( destinationObjectName )
306
- . WithCopyObjectSource ( copySourceObjectArgs ) ;
307
- await client . CopyObjectAsync ( copyObjectArgs , cancellationToken ) . ConfigureAwait ( false ) ;
315
+ await CallApi ( async ( ) =>
316
+ {
317
+ try
318
+ {
319
+ var copySourceObjectArgs = new CopySourceObjectArgs ( )
320
+ . WithBucket ( sourceBucketName )
321
+ . WithObject ( sourceObjectName ) ;
322
+ var copyObjectArgs = new CopyObjectArgs ( )
323
+ . WithBucket ( destinationBucketName )
324
+ . WithObject ( destinationObjectName )
325
+ . WithCopyObjectSource ( copySourceObjectArgs ) ;
326
+ await client . CopyObjectAsync ( copyObjectArgs , cancellationToken ) . ConfigureAwait ( false ) ;
327
+ }
328
+ catch ( ObjectNotFoundException ex ) when ( ex . ServerMessage . Contains ( "Not found" , StringComparison . OrdinalIgnoreCase ) )
329
+ {
330
+ throw new API . StorageObjectNotFoundException ( ex . ServerMessage ) ;
331
+ }
332
+ } ) . ConfigureAwait ( false ) ;
308
333
}
309
334
310
- private static async Task GetObjectUsingClient ( IObjectOperations client , string bucketName , string objectName , Action < Stream > callback , CancellationToken cancellationToken )
335
+ private async Task GetObjectUsingClient ( IObjectOperations client , string bucketName , string objectName , Action < Stream > callback , CancellationToken cancellationToken )
311
336
{
312
- var args = new GetObjectArgs ( )
313
- . WithBucket ( bucketName )
314
- . WithObject ( objectName )
315
- . WithCallbackStream ( callback ) ;
316
- await client . GetObjectAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
337
+ await CallApi ( async ( ) =>
338
+ {
339
+ var args = new GetObjectArgs ( )
340
+ . WithBucket ( bucketName )
341
+ . WithObject ( objectName )
342
+ . WithCallbackStream ( callback ) ;
343
+ await client . GetObjectAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
344
+ } ) . ConfigureAwait ( false ) ;
317
345
}
318
346
319
- private async Task < IList < VirtualFileInfo > > ListObjectsUsingClient ( IBucketOperations client , string bucketName , string ? prefix , bool recursive , CancellationToken cancellationToken )
347
+ private Task < IList < VirtualFileInfo > > ListObjectsUsingClient ( IBucketOperations client , string bucketName , string ? prefix , bool recursive , CancellationToken cancellationToken )
320
348
{
321
- return await Task . Run ( ( ) =>
349
+ var files = new List < VirtualFileInfo > ( ) ;
350
+ var listArgs = new ListObjectsArgs ( )
351
+ . WithBucket ( bucketName )
352
+ . WithPrefix ( prefix )
353
+ . WithRecursive ( recursive ) ;
354
+
355
+ try
322
356
{
323
- var files = new List < VirtualFileInfo > ( ) ;
324
- var listArgs = new ListObjectsArgs ( )
325
- . WithBucket ( bucketName )
326
- . WithPrefix ( prefix )
327
- . WithRecursive ( recursive ) ;
357
+ var done = new TaskCompletionSource < IList < VirtualFileInfo > > ( ) ;
328
358
329
359
var objservable = client . ListObjectsAsync ( listArgs , cancellationToken ) ;
330
360
var completedEvent = new ManualResetEventSlim ( false ) ;
@@ -341,44 +371,103 @@ private async Task<IList<VirtualFileInfo>> ListObjectsUsingClient(IBucketOperati
341
371
error =>
342
372
{
343
373
_logger . ListObjectError ( bucketName , error . Message ) ;
374
+ if ( error is OperationCanceledException )
375
+ done . SetException ( error ) ;
376
+ else
377
+ done . SetException ( new ListObjectException ( error . ToString ( ) ) ) ;
344
378
} ,
345
- ( ) => completedEvent . Set ( ) , cancellationToken ) ;
379
+ ( ) =>
380
+ {
381
+ done . SetResult ( files ) ;
382
+ if ( cancellationToken . IsCancellationRequested )
383
+ {
384
+ throw new ListObjectTimeoutException ( "Timed out waiting for results." ) ;
385
+ }
386
+ } , cancellationToken ) ;
346
387
347
- completedEvent . Wait ( cancellationToken ) ;
348
- return files ;
349
- } ) . ConfigureAwait ( false ) ;
388
+ return done . Task ;
389
+ }
390
+ catch ( ConnectionException ex )
391
+ {
392
+ _logger . ConnectionError ( ex ) ;
393
+ var iex = new StorageConnectionException ( ex . Message ) ;
394
+ iex . Errors . Add ( ex . ServerMessage ) ;
395
+ if ( ex . ServerResponse is not null && ! string . IsNullOrWhiteSpace ( ex . ServerResponse . ErrorMessage ) )
396
+ {
397
+ iex . Errors . Add ( ex . ServerResponse . ErrorMessage ) ;
398
+ }
399
+ throw iex ;
400
+ }
401
+ catch ( Exception ex ) when ( ex is not ListObjectTimeoutException && ex is not ListObjectException )
402
+ {
403
+ _logger . StorageServiceError ( ex ) ;
404
+ throw new StorageServiceException ( ex . ToString ( ) ) ;
405
+ }
350
406
}
351
407
352
- private static async Task RemoveObjectUsingClient ( IObjectOperations client , string bucketName , string objectName , CancellationToken cancellationToken )
408
+ private async Task RemoveObjectUsingClient ( IObjectOperations client , string bucketName , string objectName , CancellationToken cancellationToken )
353
409
{
354
- var args = new RemoveObjectArgs ( )
410
+ await CallApi ( async ( ) =>
411
+ {
412
+ var args = new RemoveObjectArgs ( )
355
413
. WithBucket ( bucketName )
356
414
. WithObject ( objectName ) ;
357
- await client . RemoveObjectAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
415
+ await client . RemoveObjectAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
416
+ } ) . ConfigureAwait ( false ) ;
358
417
}
359
418
360
- private static async Task PutObjectUsingClient ( IObjectOperations client , string bucketName , string objectName , Stream data , long size , string contentType , Dictionary < string , string > ? metadata , CancellationToken cancellationToken )
419
+ private async Task PutObjectUsingClient ( IObjectOperations client , string bucketName , string objectName , Stream data , long size , string contentType , Dictionary < string , string > ? metadata , CancellationToken cancellationToken )
361
420
{
362
- var args = new PutObjectArgs ( )
363
- . WithBucket ( bucketName )
364
- . WithObject ( objectName )
365
- . WithStreamData ( data )
366
- . WithObjectSize ( size )
367
- . WithContentType ( contentType ) ;
368
- if ( metadata is not null )
421
+ await CallApi ( async ( ) =>
369
422
{
370
- args . WithHeaders ( metadata ) ;
371
- }
423
+ var args = new PutObjectArgs ( )
424
+ . WithBucket ( bucketName )
425
+ . WithObject ( objectName )
426
+ . WithStreamData ( data )
427
+ . WithObjectSize ( size )
428
+ . WithContentType ( contentType ) ;
429
+ if ( metadata is not null )
430
+ {
431
+ args . WithHeaders ( metadata ) ;
432
+ }
433
+
434
+ await client . PutObjectAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
435
+ } ) . ConfigureAwait ( false ) ;
436
+ }
372
437
373
- await client . PutObjectAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
438
+ private async Task RemoveObjectsUsingClient ( IObjectOperations client , string bucketName , IEnumerable < string > objectNames , CancellationToken cancellationToken )
439
+ {
440
+ await CallApi ( async ( ) =>
441
+ {
442
+ var args = new RemoveObjectsArgs ( )
443
+ . WithBucket ( bucketName )
444
+ . WithObjects ( objectNames . ToList ( ) ) ;
445
+ await client . RemoveObjectsAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
446
+ } ) . ConfigureAwait ( false ) ;
374
447
}
375
448
376
- private static async Task RemoveObjectsUsingClient ( IObjectOperations client , string bucketName , IEnumerable < string > objectNames , CancellationToken cancellationToken )
449
+ private async Task CallApi ( Func < Task > func )
377
450
{
378
- var args = new RemoveObjectsArgs ( )
379
- . WithBucket ( bucketName )
380
- . WithObjects ( objectNames . ToList ( ) ) ;
381
- await client . RemoveObjectsAsync ( args , cancellationToken ) . ConfigureAwait ( false ) ;
451
+ try
452
+ {
453
+ await func ( ) . ConfigureAwait ( false ) ;
454
+ }
455
+ catch ( ConnectionException ex )
456
+ {
457
+ _logger . ConnectionError ( ex ) ;
458
+ var iex = new StorageConnectionException ( ex . Message ) ;
459
+ iex . Errors . Add ( ex . ServerMessage ) ;
460
+ if ( ex . ServerResponse is not null && ! string . IsNullOrWhiteSpace ( ex . ServerResponse . ErrorMessage ) )
461
+ {
462
+ iex . Errors . Add ( ex . ServerResponse . ErrorMessage ) ;
463
+ }
464
+ throw iex ;
465
+ }
466
+ catch ( Exception ex )
467
+ {
468
+ _logger . StorageServiceError ( ex ) ;
469
+ throw new StorageServiceException ( ex . ToString ( ) ) ;
470
+ }
382
471
}
383
472
384
473
#endregion Internal Helper Methods
0 commit comments