4646import org .apache .cloudstack .engine .subsystem .api .storage .SnapshotDataFactory ;
4747import org .apache .cloudstack .engine .subsystem .api .storage .SnapshotInfo ;
4848import org .apache .cloudstack .engine .subsystem .api .storage .TemplateInfo ;
49+ import org .apache .cloudstack .engine .subsystem .api .storage .TemplateService ;
50+ import org .apache .cloudstack .engine .subsystem .api .storage .TemplateService .TemplateApiResult ;
4951import org .apache .cloudstack .framework .async .AsyncCallFuture ;
5052import org .apache .cloudstack .framework .config .ConfigKey ;
5153import org .apache .cloudstack .framework .config .Configurable ;
@@ -91,6 +93,8 @@ public class StorageOrchestrator extends ManagerBase implements StorageOrchestra
9193 @ Inject
9294 private SecondaryStorageService secStgSrv ;
9395 @ Inject
96+ TemplateService templateService ;
97+ @ Inject
9498 TemplateDataStoreDao templateDataStoreDao ;
9599 @ Inject
96100 VolumeDataStoreDao volumeDataStoreDao ;
@@ -106,6 +110,8 @@ public class StorageOrchestrator extends ManagerBase implements StorageOrchestra
106110
107111 Integer numConcurrentCopyTasksPerSSVM = 2 ;
108112
113+ private final Map <Long , ThreadPoolExecutor > zoneExecutorMap = new HashMap <>();
114+
109115 @ Override
110116 public String getConfigComponentName () {
111117 return StorageOrchestrationService .class .getName ();
@@ -167,8 +173,6 @@ public MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatasto
167173 double meanstddev = getStandardDeviation (storageCapacities );
168174 double threshold = ImageStoreImbalanceThreshold .value ();
169175 MigrationResponse response = null ;
170- ThreadPoolExecutor executor = new ThreadPoolExecutor (numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM , 30 ,
171- TimeUnit .MINUTES , new MigrateBlockingQueue <>(numConcurrentCopyTasksPerSSVM ));
172176 Date start = new Date ();
173177 if (meanstddev < threshold && migrationPolicy == MigrationPolicy .BALANCE ) {
174178 logger .debug ("mean std deviation of the image stores is below threshold, no migration required" );
@@ -177,7 +181,7 @@ public MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatasto
177181 }
178182
179183 int skipped = 0 ;
180- List <Future <AsyncCallFuture < DataObjectResult > >> futures = new ArrayList <>();
184+ List <Future <DataObjectResult >> futures = new ArrayList <>();
181185 while (true ) {
182186 DataObject chosenFileForMigration = null ;
183187 if (files .size () > 0 ) {
@@ -206,7 +210,7 @@ public MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatasto
206210 }
207211
208212 if (shouldMigrate (chosenFileForMigration , srcDatastore .getId (), destDatastoreId , storageCapacities , snapshotChains , childTemplates , migrationPolicy )) {
209- storageCapacities = migrateAway (chosenFileForMigration , storageCapacities , snapshotChains , childTemplates , srcDatastore , destDatastoreId , executor , futures );
213+ storageCapacities = migrateAway (chosenFileForMigration , storageCapacities , snapshotChains , childTemplates , srcDatastore , destDatastoreId , futures );
210214 } else {
211215 if (migrationPolicy == MigrationPolicy .BALANCE ) {
212216 continue ;
@@ -217,7 +221,7 @@ public MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatasto
217221 }
218222 }
219223 Date end = new Date ();
220- handleSnapshotMigration (srcDataStoreId , start , end , migrationPolicy , futures , storageCapacities , executor );
224+ handleSnapshotMigration (srcDataStoreId , start , end , migrationPolicy , futures , storageCapacities );
221225 return handleResponse (futures , migrationPolicy , message , success );
222226 }
223227
@@ -250,9 +254,7 @@ public MigrationResponse migrateResources(Long srcImgStoreId, Long destImgStoreI
250254 storageCapacities = getStorageCapacities (storageCapacities , srcImgStoreId );
251255 storageCapacities = getStorageCapacities (storageCapacities , destImgStoreId );
252256
253- ThreadPoolExecutor executor = new ThreadPoolExecutor (numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM , 30 ,
254- TimeUnit .MINUTES , new MigrateBlockingQueue <>(numConcurrentCopyTasksPerSSVM ));
255- List <Future <AsyncCallFuture <DataObjectResult >>> futures = new ArrayList <>();
257+ List <Future <DataObjectResult >> futures = new ArrayList <>();
256258 Date start = new Date ();
257259
258260 while (true ) {
@@ -272,7 +274,7 @@ public MigrationResponse migrateResources(Long srcImgStoreId, Long destImgStoreI
272274 }
273275
274276 if (storageCapacityBelowThreshold (storageCapacities , destImgStoreId )) {
275- storageCapacities = migrateAway (chosenFileForMigration , storageCapacities , snapshotChains , childTemplates , srcDatastore , destImgStoreId , executor , futures );
277+ storageCapacities = migrateAway (chosenFileForMigration , storageCapacities , snapshotChains , childTemplates , srcDatastore , destImgStoreId , futures );
276278 } else {
277279 message = "Migration failed. Destination store doesn't have enough capacity for migration" ;
278280 success = false ;
@@ -289,14 +291,19 @@ public MigrationResponse migrateResources(Long srcImgStoreId, Long destImgStoreI
289291 SnapshotInfo snapshotInfo = snapshotFactory .getSnapshot (snap .getSnapshotId (), snap .getDataStoreId (), DataStoreRole .Image );
290292 SnapshotInfo parentSnapshot = snapshotInfo .getParent ();
291293 if (snapshotInfo .getDataStore ().getId () == srcImgStoreId && parentSnapshot != null && migratedSnapshotIdList .contains (parentSnapshot .getSnapshotId ())) {
292- futures .add (executor . submit (new MigrateDataTask (snapshotInfo , srcDatastore , dataStoreManager .getDataStore (destImgStoreId , DataStoreRole .Image ))));
294+ futures .add (submit (srcDatastore . getScope (). getScopeId (), new MigrateDataTask (snapshotInfo , srcDatastore , dataStoreManager .getDataStore (destImgStoreId , DataStoreRole .Image ))));
293295 }
294296 });
295297 }
296298
297299 return handleResponse (futures , null , message , success );
298300 }
299301
302+ @ Override
303+ public Future <TemplateApiResult > orchestrateTemplateCopyToImageStore (TemplateInfo source , DataStore destStore ) {
304+ return submit (destStore .getScope ().getScopeId (), new CopyTemplateTask (source , destStore ));
305+ }
306+
300307 protected Pair <String , Boolean > migrateCompleted (Long destDatastoreId , DataStore srcDatastore , List <DataObject > files , MigrationPolicy migrationPolicy , int skipped ) {
301308 String message = "" ;
302309 boolean success = true ;
@@ -332,19 +339,10 @@ protected Map<Long, Pair<Long, Long>> migrateAway(
332339 Map <DataObject , Pair <List <TemplateInfo >, Long >> templateChains ,
333340 DataStore srcDatastore ,
334341 Long destDatastoreId ,
335- ThreadPoolExecutor executor ,
336- List <Future <AsyncCallFuture <DataObjectResult >>> futures ) {
342+ List <Future <DataObjectResult >> futures ) {
337343
338344 Long fileSize = migrationHelper .getFileSize (chosenFileForMigration , snapshotChains , templateChains );
339-
340345 storageCapacities = assumeMigrate (storageCapacities , srcDatastore .getId (), destDatastoreId , fileSize );
341- long activeSsvms = migrationHelper .activeSSVMCount (srcDatastore );
342- long totalJobs = activeSsvms * numConcurrentCopyTasksPerSSVM ;
343- // Increase thread pool size with increase in number of SSVMs
344- if ( totalJobs > executor .getCorePoolSize ()) {
345- executor .setMaximumPoolSize ((int ) (totalJobs ));
346- executor .setCorePoolSize ((int ) (totalJobs ));
347- }
348346
349347 MigrateDataTask task = new MigrateDataTask (chosenFileForMigration , srcDatastore , dataStoreManager .getDataStore (destDatastoreId , DataStoreRole .Image ));
350348 if (chosenFileForMigration instanceof SnapshotInfo ) {
@@ -353,19 +351,56 @@ protected Map<Long, Pair<Long, Long>> migrateAway(
353351 if (chosenFileForMigration instanceof TemplateInfo ) {
354352 task .setTemplateChain (templateChains );
355353 }
356- futures .add (( executor . submit ( task ) ));
354+ futures .add (submit ( srcDatastore . getScope (). getScopeId (), task ));
357355 logger .debug ("Migration of {}: {} is initiated." , chosenFileForMigration .getType ().name (), chosenFileForMigration .getUuid ());
358356 return storageCapacities ;
359357 }
360358
359+ protected synchronized <T > Future <T > submit (Long zoneId , Callable <T > task ) {
360+ if (!zoneExecutorMap .containsKey (zoneId )) {
361+ zoneExecutorMap .put (zoneId , new ThreadPoolExecutor (numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM ,
362+ 30 , TimeUnit .MINUTES , new MigrateBlockingQueue <>(numConcurrentCopyTasksPerSSVM )));
363+ }
364+ scaleExecutorIfNecessary (zoneId );
365+ return zoneExecutorMap .get (zoneId ).submit (task );
366+ }
367+
368+ protected void scaleExecutorIfNecessary (Long zoneId ) {
369+ long activeSsvms = migrationHelper .activeSSVMCount (zoneId );
370+ long totalJobs = activeSsvms * numConcurrentCopyTasksPerSSVM ;
371+ ThreadPoolExecutor executor = zoneExecutorMap .get (zoneId );
372+ if (totalJobs > executor .getCorePoolSize ()) {
373+ logger .debug ("Scaling up executor of zone [{}] from [{}] to [{}] threads." , zoneId , executor .getCorePoolSize (),
374+ totalJobs );
375+ executor .setMaximumPoolSize ((int ) (totalJobs ));
376+ executor .setCorePoolSize ((int ) (totalJobs ));
377+ }
378+ }
379+
380+ protected synchronized void tryCleaningUpExecutor (Long zoneId ) {
381+ if (!zoneExecutorMap .containsKey (zoneId )) {
382+ logger .debug ("No executor exists for zone [{}]." , zoneId );
383+ return ;
384+ }
361385
386+ ThreadPoolExecutor executor = zoneExecutorMap .get (zoneId );
387+ int activeTasks = executor .getActiveCount ();
388+ if (activeTasks > 1 ) {
389+ logger .debug ("Not cleaning executor of zone [{}] yet, as there are [{}] active tasks." , zoneId , activeTasks );
390+ return ;
391+ }
362392
363- private MigrationResponse handleResponse (List <Future <AsyncCallFuture <DataObjectResult >>> futures , MigrationPolicy migrationPolicy , String message , boolean success ) {
393+ logger .debug ("Cleaning executor of zone [{}]." , zoneId );
394+ zoneExecutorMap .remove (zoneId );
395+ executor .shutdown ();
396+ }
397+
398+ private MigrationResponse handleResponse (List <Future <DataObjectResult >> futures , MigrationPolicy migrationPolicy , String message , boolean success ) {
364399 int successCount = 0 ;
365- for (Future <AsyncCallFuture < DataObjectResult > > future : futures ) {
400+ for (Future <DataObjectResult > future : futures ) {
366401 try {
367- AsyncCallFuture < DataObjectResult > res = future .get ();
368- if (res .get (). isSuccess ()) {
402+ DataObjectResult res = future .get ();
403+ if (res .isSuccess ()) {
369404 successCount ++;
370405 }
371406 } catch ( InterruptedException | ExecutionException e ) {
@@ -379,7 +414,7 @@ private MigrationResponse handleResponse(List<Future<AsyncCallFuture<DataObjectR
379414 }
380415
381416 private void handleSnapshotMigration (Long srcDataStoreId , Date start , Date end , MigrationPolicy policy ,
382- List <Future <AsyncCallFuture < DataObjectResult >>> futures , Map <Long , Pair <Long , Long >> storageCapacities , ThreadPoolExecutor executor ) {
417+ List <Future <DataObjectResult >> futures , Map <Long , Pair <Long , Long >> storageCapacities ) {
383418 DataStore srcDatastore = dataStoreManager .getDataStore (srcDataStoreId , DataStoreRole .Image );
384419 List <SnapshotDataStoreVO > snaps = snapshotDataStoreDao .findSnapshots (srcDataStoreId , start , end );
385420 if (!snaps .isEmpty ()) {
@@ -395,12 +430,12 @@ private void handleSnapshotMigration(Long srcDataStoreId, Date start, Date end,
395430 storeId = dstores .get (1 );
396431 }
397432 DataStore datastore = dataStoreManager .getDataStore (storeId , DataStoreRole .Image );
398- futures .add (executor . submit (new MigrateDataTask (snapshotInfo , srcDatastore , datastore )));
433+ futures .add (submit (srcDatastore . getScope (). getScopeId (), new MigrateDataTask (snapshotInfo , srcDatastore , datastore )));
399434 }
400435 if (parentSnapshot != null ) {
401436 DataStore parentDS = dataStoreManager .getDataStore (parentSnapshot .getDataStore ().getId (), DataStoreRole .Image );
402437 if (parentDS .getId () != snapshotInfo .getDataStore ().getId ()) {
403- futures .add (executor . submit (new MigrateDataTask (snapshotInfo , srcDatastore , parentDS )));
438+ futures .add (submit (srcDatastore . getScope (). getScopeId (), new MigrateDataTask (snapshotInfo , srcDatastore , parentDS )));
404439 }
405440 }
406441 }
@@ -527,7 +562,7 @@ private double calculateStorageStandardDeviation(double[] metricValues, double m
527562 return standardDeviation .evaluate (metricValues , mean );
528563 }
529564
530- private class MigrateDataTask implements Callable <AsyncCallFuture < DataObjectResult > > {
565+ private class MigrateDataTask implements Callable <DataObjectResult > {
531566 private DataObject file ;
532567 private DataStore srcDataStore ;
533568 private DataStore destDataStore ;
@@ -557,8 +592,44 @@ public DataObject getFile() {
557592 }
558593
559594 @ Override
560- public AsyncCallFuture <DataObjectResult > call () throws Exception {
561- return secStgSrv .migrateData (file , srcDataStore , destDataStore , snapshotChain , templateChain );
595+ public DataObjectResult call () {
596+ DataObjectResult result ;
597+ AsyncCallFuture <DataObjectResult > future = secStgSrv .migrateData (file , srcDataStore , destDataStore , snapshotChain , templateChain );
598+ try {
599+ result = future .get ();
600+ } catch (ExecutionException | InterruptedException e ) {
601+ logger .warn ("Exception while migrating data to another secondary storage: {}" , e .toString ());
602+ result = new DataObjectResult (file );
603+ result .setResult (e .toString ());
604+ }
605+ tryCleaningUpExecutor (srcDataStore .getScope ().getScopeId ());
606+ return result ;
607+ }
608+ }
609+
610+ private class CopyTemplateTask implements Callable <TemplateApiResult > {
611+ private TemplateInfo sourceTmpl ;
612+ private DataStore destStore ;
613+
614+ public CopyTemplateTask (TemplateInfo sourceTmpl , DataStore destStore ) {
615+ this .sourceTmpl = sourceTmpl ;
616+ this .destStore = destStore ;
617+ }
618+
619+ @ Override
620+ public TemplateApiResult call () {
621+ TemplateApiResult result ;
622+ AsyncCallFuture <TemplateApiResult > future = templateService .copyTemplateToImageStore (sourceTmpl , destStore );
623+ try {
624+ result = future .get ();
625+ } catch (ExecutionException | InterruptedException e ) {
626+ logger .warn ("Exception while copying template [{}] from image store [{}] to image store [{}]: {}" ,
627+ sourceTmpl .getUniqueName (), sourceTmpl .getDataStore ().getName (), destStore .getName (), e .toString ());
628+ result = new TemplateApiResult (sourceTmpl );
629+ result .setResult (e .getMessage ());
630+ }
631+ tryCleaningUpExecutor (destStore .getScope ().getScopeId ());
632+ return result ;
562633 }
563634 }
564635}
0 commit comments