Skip to content

Commit 976e14e

Browse files
author
Benjamin Calef
committed
[0.1.2] add store and website dimensions for parallel process
1 parent 1a076c5 commit 976e14e

File tree

5 files changed

+209
-3
lines changed

5 files changed

+209
-3
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.idea
2-
/vendor
2+
composer.lock
3+
/vendor
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Zepgram\MultiThreading\Model\Dimension;
6+
7+
use Magento\Store\Api\Data\StoreInterface;
8+
use Magento\Store\Api\StoreRepositoryInterface;
9+
use Zepgram\MultiThreading\Model\ItemProvider\ArrayWrapper;
10+
use Zepgram\MultiThreading\Model\ItemProvider\ArrayWrapperFactory;
11+
use Zepgram\MultiThreading\Model\Processor\ForkedProcessorRunner;
12+
13+
class ParallelStoreProcessor
14+
{
15+
/** @var ForkedProcessorRunner */
16+
private $forkedProcessorRunner;
17+
18+
/** @var ArrayWrapperFactory */
19+
private $arrayWrapperFactory;
20+
21+
/** @var StoreRepositoryInterface */
22+
private $storeRepository;
23+
24+
public function __construct(
25+
ForkedProcessorRunner $forkedProcessorRunner,
26+
ArrayWrapperFactory $arrayWrapperFactory,
27+
StoreRepositoryInterface $storeRepository
28+
) {
29+
$this->forkedProcessorRunner = $forkedProcessorRunner;
30+
$this->arrayWrapperFactory = $arrayWrapperFactory;
31+
$this->storeRepository = $storeRepository;
32+
}
33+
34+
/**
35+
* @param callable $callback
36+
* @param int|null $maxChildrenProcess
37+
* @param bool $onlyActiveStores
38+
* @param bool $withDefaultStore
39+
* @return void
40+
*/
41+
public function process(
42+
callable $callback,
43+
int $maxChildrenProcess = null,
44+
bool $onlyActiveStores = true,
45+
bool $withDefaultStore = false
46+
): void {
47+
$stores = array_filter($this->storeRepository->getList(),
48+
function (StoreInterface $store) use ($onlyActiveStores, $withDefaultStore) {
49+
if (!$withDefaultStore && (int)$store->getId() === 0) {
50+
return false;
51+
}
52+
if ($onlyActiveStores && !$store->getIsActive()) {
53+
return false;
54+
}
55+
return true;
56+
});
57+
58+
/** @var ArrayWrapper $itemProvider */
59+
$itemProvider = $this->arrayWrapperFactory->create([
60+
'items' => $stores,
61+
'pageSize' => 1
62+
]);
63+
$storeCount = $itemProvider->getSize();
64+
$maxChildrenProcess = ($maxChildrenProcess >= $storeCount || $maxChildrenProcess === null)
65+
? $storeCount : $maxChildrenProcess;
66+
67+
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess);
68+
}
69+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Zepgram\MultiThreading\Model;
6+
7+
use Magento\Store\Api\Data\WebsiteInterface;
8+
use Magento\Store\Api\WebsiteRepositoryInterface;
9+
use Zepgram\MultiThreading\Model\ItemProvider\ArrayWrapper;
10+
use Zepgram\MultiThreading\Model\ItemProvider\ArrayWrapperFactory;
11+
use Zepgram\MultiThreading\Model\Processor\ForkedProcessorRunner;
12+
13+
class ParallelWebsiteProcessor
14+
{
15+
/** @var ForkedProcessorRunner */
16+
private $forkedProcessorRunner;
17+
18+
/** @var ArrayWrapperFactory */
19+
private $arrayWrapperFactory;
20+
21+
/** @var WebsiteRepositoryInterface */
22+
private $websiteRepository;
23+
24+
public function __construct(
25+
ForkedProcessorRunner $forkedProcessorRunner,
26+
ArrayWrapperFactory $arrayWrapperFactory,
27+
WebsiteRepositoryInterface $websiteRepository
28+
) {
29+
$this->forkedProcessorRunner = $forkedProcessorRunner;
30+
$this->arrayWrapperFactory = $arrayWrapperFactory;
31+
$this->websiteRepository = $websiteRepository;
32+
}
33+
34+
/**
35+
* @param callable $callback
36+
* @param int|null $maxChildrenProcess
37+
* @param bool $withDefaultWebsite
38+
* @return void
39+
*/
40+
public function process(
41+
callable $callback,
42+
int $maxChildrenProcess = null,
43+
bool $withDefaultWebsite = false
44+
): void {
45+
$websites = array_filter($this->websiteRepository->getList(),
46+
function (WebsiteInterface $website) use ($withDefaultWebsite) {
47+
if (!$withDefaultWebsite && (int)$website->getId() === 0) {
48+
return false;
49+
}
50+
return true;
51+
});
52+
53+
/** @var ArrayWrapper $itemProvider */
54+
$itemProvider = $this->arrayWrapperFactory->create([
55+
'items' => $websites,
56+
'pageSize' => 1
57+
]);
58+
$websiteCount = $itemProvider->getSize();
59+
$maxChildrenProcess = ($maxChildrenProcess >= $websiteCount || $maxChildrenProcess === null)
60+
? $websiteCount : $maxChildrenProcess;
61+
62+
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess);
63+
}
64+
}

README.md

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class MyAwesomeClass
109109
$this->forkedArrayProcessor = $forkedArrayProcessor;
110110
}
111111

112-
$array = [1,2,3,4,5];
112+
$array = [1,2,3,4,5,...];
113113
$callback = function ($item) {
114114
echo $item;
115115
// do your business logic here
@@ -124,6 +124,63 @@ class MyAwesomeClass
124124
}
125125
```
126126

127+
### ParallelStoreProcessor or ParallelWebsiteProcessor
128+
129+
```php
130+
use Zepgram\MultiThreading\Model\Dimension\ParallelStoreProcessor;
131+
use Magento\Catalog\Model\ResourceModel\Product\Collection;
132+
use Magento\Catalog\Model\ResourceModel\Product\CollectionFactory;
133+
134+
class MyAwesomeClass
135+
{
136+
/** @var ParallelStoreProcessor */
137+
private $parallelStoreProcessor;
138+
139+
/** @var CollectionFactory */
140+
private $collectionFactory;
141+
142+
public function __construct(
143+
ParallelStoreProcessor $parallelStoreProcessor,
144+
CollectionFactory $collectionFactory
145+
) {
146+
$this->parallelStoreProcessor = $parallelStoreProcessor;
147+
$this->collectionFactory = $collectionFactory;
148+
}
149+
150+
$array = [1,2,3,4,5,...];
151+
$callback = function (StoreInterface $store) {
152+
// retrieve data from database foreach stores (do not load the collection !)
153+
$collection = $this->collectionFactory->create();
154+
$collection->addFieldToFilter('type_id', 'simple')
155+
->addFieldToSelect(['sku', 'description', 'created_at'])
156+
->setStoreId($store->getId())
157+
->addStoreFilter($store->getId())
158+
->distinct(true);
159+
160+
// handle pagination system to avoid memory leak
161+
$currentPage = 1;
162+
$pageSize = 1000;
163+
$totalPages = (int)ceil($collection->getSize() / $pageSize);
164+
while ($currentPage <= $totalPages) {
165+
$collection->setCurPage($currentPage);
166+
$collection->setPageSize($pageSize);
167+
foreach ($collection->getItems() as $product) {
168+
// do your business logic here
169+
}
170+
$currentPage++;
171+
}
172+
};
173+
174+
// your collection will be processed foreach store by a dedicated child process
175+
$this->parallelStoreProcessor->process(
176+
$callback,
177+
$maxChildrenProcess = null,
178+
$onlyActiveStores = true,
179+
$withDefaultStore = false
180+
);
181+
}
182+
```
183+
127184
### bin/magento thread:processor command
128185

129186
This command allows running a command indefinitely in a dedicated thread using
@@ -150,6 +207,15 @@ use a similar approach to process a search criteria or a collection. The process
150207
into several pages, and for each page, a child process is created to run the callback
151208
function specified by the user on each item of that page.
152209

210+
The `ParallelStoreProcessor` and `ParallelWebsiteProcessor` classes are designed to make it easier
211+
to process a list of stores or websites in parallel. To use either of these classes, you'll need to
212+
provide a callback function that will be called for each store or website in the list. The callback
213+
function should take one parameter, which will be a single store or website object.<br>
214+
Each store or website will be passed to the callback function in a separate process,
215+
allowing faster processing times.
216+
The number of children process cannot exceed the number of stores or websites: for example,
217+
if you have 10 stores, the maximum number of child processes that can be created in parallel is 10.
218+
153219
#### Here is a breakdown of the parameters:
154220
- `$collection`/`$searchCriteria`/`$array`: The first parameter is the data source,
155221
either a `Magento\Framework\Api\SearchCriteriaInterface` for ForkedSearchResultProcessor or

composer.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "zepgram/module-multi-threading",
33
"description": "This module is a powerful tool for developers who want to process large data sets in a short amount of time",
44
"type": "magento2-module",
5-
"version": "0.1.1",
5+
"version": "0.1.2",
66
"authors": [
77
{
88
"name": "Benjamin Calef",
@@ -11,6 +11,7 @@
1111
],
1212
"require": {
1313
"magento/framework": "^101.0.0|^102.0.0|^103.0.0",
14+
"magento/module-store": "^101",
1415
"ext-pcntl": "*"
1516
},
1617
"autoload": {
@@ -27,5 +28,10 @@
2728
"type": "composer",
2829
"url": "https://repo.magento.com/"
2930
}
31+
},
32+
"config": {
33+
"allow-plugins": {
34+
"magento/composer-dependency-version-audit-plugin": true
35+
}
3036
}
3137
}

0 commit comments

Comments
 (0)