Skip to content

Commit

Permalink
Add PK Chunking to Bulk Api Query
Browse files Browse the repository at this point in the history
  • Loading branch information
frankkessler committed Nov 23, 2016
1 parent 209a876 commit 37f7ee8
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 33 deletions.
24 changes: 24 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,30 @@ if ($result->id) {
}
```

### Query with PK Chunking

```php
$operationType = 'query';
$objectType = 'Account';
$objectData = 'SELECT Id, Name FROM Account';

$result = Salesforce::bulk()->runBatch($operationType, $objectType, $objectData, [
'contentType' => 'CSV',
'Sforce-Enable-PKChunking' => [
'chunkSize' => 2500,
],
]);

if ($result->id) {
$id = $result->id;
foreach ($result->batches as $batch) {
foreach ($batch->records as $record) {
$account_id = $record['Id'];
}
}
}
```

### Custom REST Endpoint (GET)

```php
Expand Down
131 changes: 101 additions & 30 deletions src/Bulk.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ public function runBatch($operation, $objectType, $data, $options = [])
$batches = [];

$defaults = [
'externalIdFieldName' => null,
'batchSize' => 2000,
'batchTimeout' => 600,
'contentType' => 'JSON',
'pollIntervalSeconds' => 5,
'isBatchedResult' => false,
'concurrencyMode' => 'Parallel',
'externalIdFieldName' => null,
'batchSize' => 2000,
'batchTimeout' => 600,
'contentType' => 'JSON',
'pollIntervalSeconds' => 5,
'isBatchedResult' => false,
'concurrencyMode' => 'Parallel',
'Sforce-Enable-PKChunking' => false,
];

$options = array_replace($defaults, $options);
Expand All @@ -52,7 +53,7 @@ public function runBatch($operation, $objectType, $data, $options = [])
$options['isBatchedResult'] = true;
}

$job = $this->createJob($operation, $objectType, $options['externalIdFieldName'], $options['contentType'], $options['concurrencyMode']);
$job = $this->createJob($operation, $objectType, $options['externalIdFieldName'], $options['contentType'], $options['concurrencyMode'], $options);

if ($job->id) {
//if data is array, we can split it into batches
Expand All @@ -62,7 +63,7 @@ public function runBatch($operation, $objectType, $data, $options = [])
for ($i = 1; $i <= $totalNumberOfBatches; $i++) {
$batches[] = $this->addBatch($job->id, array_splice($data, ($i - 1) * $options['batchSize'], $options['batchSize']));
}
} else { //probably a string query so run in onee batch
} else { //probably a string query so run in one batch
$batches[] = $this->addBatch($job->id, $data);
}
} else {
Expand All @@ -72,6 +73,10 @@ public function runBatch($operation, $objectType, $data, $options = [])
$time = time();
$timeout = $time + $options['batchTimeout'];

if($options['Sforce-Enable-PKChunking']){
$batches = $this->allBatchDetails($job->id, $options['contentType']);
}

$batches_finished = [];

while (count($batches_finished) < count($batches) && $time < $timeout) {
Expand All @@ -82,10 +87,13 @@ public function runBatch($operation, $objectType, $data, $options = [])
continue;
}

$batch = $this->batchDetails($job->id, $batch->id);
if (in_array($batch->state, ['Completed', 'Failed', 'Not Processed'])) {
$batchResult = $this->batchResult($job->id, $batch->id, $options['isBatchedResult']);
$batch->records = $batchResult->records;
$batch = $this->batchDetails($job->id, $batch->id, $options['contentType']);
if (in_array($batch->state, ['Completed', 'Failed', 'Not Processed', 'NotProcessed'])) {

if(in_array($batch->state, ['Completed'])) {
$batchResult = $this->batchResult($job->id, $batch->id, $options['isBatchedResult'], null, $options['contentType']);
$batch->records = $batchResult->records;
}
$batches_finished[] = $batch->id;
}
}
Expand Down Expand Up @@ -118,7 +126,7 @@ public function runBatch($operation, $objectType, $data, $options = [])
*
* @return BulkJobResponse
*/
public function createJob($operation, $objectType, $externalIdFieldName = null, $contentType = 'JSON', $concurrencyMode = 'Parallel')
public function createJob($operation, $objectType, $externalIdFieldName = null, $contentType = 'JSON', $concurrencyMode = 'Parallel', $options=[])
{
$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job';

Expand All @@ -128,6 +136,12 @@ public function createJob($operation, $objectType, $externalIdFieldName = null,
'concurrencyMode' => $concurrencyMode,
];

$headers = [];

if(isset($options['Sforce-Enable-PKChunking']) && $options['Sforce-Enable-PKChunking']){
$headers['Sforce-Enable-PKChunking'] = $this->parsePkChunkingHeader($options['Sforce-Enable-PKChunking']);
}

//order of variables matters so this externalIdFieldName has to come before contentType
if ($operation == 'upsert') {
$json_array['externalIdFieldName'] = $externalIdFieldName;
Expand All @@ -136,7 +150,8 @@ public function createJob($operation, $objectType, $externalIdFieldName = null,
$json_array['contentType'] = $contentType;

$result = $this->call_api('post', $url, [
'json' => $json_array,
'json' => $json_array,
'headers' => $headers,
]);

if ($result && is_array($result)) {
Expand All @@ -146,11 +161,14 @@ public function createJob($operation, $objectType, $externalIdFieldName = null,
return new BulkJobResponse();
}

public function jobDetails($jobId)
public function jobDetails($jobId, $format='json')
{
$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job/'.$jobId;

$result = $this->call_api('get', $url);
$result = $this->call_api('get', $url,
[
'format' => $this->batchResponseFormatFromContentType($format),
]);

if ($result && is_array($result)) {
return new BulkJobResponse($result);
Expand Down Expand Up @@ -191,7 +209,7 @@ public function closeJob($jobId)
*
* @return BulkBatchResponse
*/
public function addBatch($jobId, $data)
public function addBatch($jobId, $data, $format='json')
{
if (!$jobId) {
//throw exception
Expand All @@ -214,6 +232,7 @@ public function addBatch($jobId, $data)
$result = $this->call_api('post', $url, [
'body' => $body,
'headers' => $headers,
'format' => $this->batchResponseFormatFromContentType($format),
]);

if ($result && is_array($result)) {
Expand All @@ -226,6 +245,7 @@ public function addBatch($jobId, $data)
/**
* @param $jobId
* @param $batchId
* @param $format
*
* @return BulkBatchResponse
*/
Expand All @@ -234,7 +254,7 @@ public function batchDetails($jobId, $batchId, $format = 'json')
$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job/'.$jobId.'/batch/'.$batchId;

$result = $this->call_api('get', $url, [
'format' => $format,
'format' => $this->batchResponseFormatFromContentType($format),
]);

if ($result && is_array($result)) {
Expand All @@ -246,6 +266,37 @@ public function batchDetails($jobId, $batchId, $format = 'json')
return new BulkBatchResponse();
}

/**
* @param $jobId
* @param $format
*
* @return BulkBatchResponse[]
*/
public function allBatchDetails($jobId, $format = 'json')
{
$batches = [];

//TODO: Fix hack to give initial Salesforce batch time to split into many batches by PK
sleep(10);
////////////////////////////////////////////////////////////////////////////////////////

$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job/'.$jobId.'/batch';

$result = $this->call_api('get', $url, [
'format' => $this->batchResponseFormatFromContentType($format),
]);

if ($result && is_array($result) && isset($result['batchInfo']) && !isset($result['batchInfo']['id'])) {
foreach($result['batchInfo'] as $batch) {
$batches[] = new BulkBatchResponse($batch);
}
} else {
//throw exception
}

return $batches;
}

/**
* @param $jobId
* @param $batchId
Expand All @@ -261,14 +312,17 @@ public function batchResult($jobId, $batchId, $isBatchedResult = false, $resultI

$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job/'.$jobId.'/batch/'.$batchId.'/result';

$resultPostArray = [];

//if this is a query result, the main result page will have an array of result ids to follow for hte query results
if ($resultId) {
$url = $url.'/'.$resultId;
$resultPostArray['format'] = $format;
}else{
$resultPostArray['format'] = $this->batchResponseFormatFromContentType($format);
}

$result = $this->call_api('get', $url, [
'format' => $format,
]);
$result = $this->call_api('get', $url, $resultPostArray);

if ($result && is_array($result)) {

Expand All @@ -277,6 +331,13 @@ public function batchResult($jobId, $batchId, $isBatchedResult = false, $resultI
$result['records'] = [];
}

if(isset($result['result'])){
if(!is_array($result['result'])){
$result['result'] = [$result['result']];
}
$result = array_merge($result, $result['result']);
}

//maximum amount of batch records allowed is 10,000
for ($i = 0; $i < 10000; $i++) {
//skip processing for the rest of the records if they don't exist
Expand All @@ -286,7 +347,7 @@ public function batchResult($jobId, $batchId, $isBatchedResult = false, $resultI

//batched results return a list of result ids that need to be processed to get the actual data
if ($isBatchedResult) {
$batchResult = $this->batchResult($jobId, $batchId, false, $result[$i]);
$batchResult = $this->batchResult($jobId, $batchId, false, $result[$i], $format);
$result['records'] = array_merge($result['records'], $batchResult->records);
} else {
//fix boolean values from appearing as
Expand Down Expand Up @@ -437,20 +498,15 @@ public function addBinaryBatch($jobId, BinaryBatch $binaryBatch, $contentType =
return new BulkBatchResponse();
}

/* public function binaryBatchResult($jobId, $batchId, $isBatchedResult = false, $resultId = null, $format='json')
{
$result = $this->batchResult($jobId, $batchId, $isBatchedResult, $resultId, $format);
if($result->state)
}
*/
protected function batchResponseFormatFromContentType($contentType)
{
switch (strtoupper($contentType)) {
case 'ZIP_CSV':
case 'ZIP/CSV':
case 'ZIP_XML':
case 'ZIP/XML':
case 'CSV':
case 'XML':
$return = 'xml';
break;
default:
Expand All @@ -460,4 +516,19 @@ protected function batchResponseFormatFromContentType($contentType)

return $return;
}

protected function parsePkChunkingHeader($pk_chunk_header)
{
if(is_array($pk_chunk_header)){
$header_parts = [];
foreach($pk_chunk_header as $key=>$value) {
$header_parts[] = $key.'='.$value;
}

return implode('; ',$header_parts);
}elseif(in_array($pk_chunk_header, [true,'true','TRUE'])){
return 'TRUE';
}
return 'FALSE';
}
}
2 changes: 1 addition & 1 deletion src/Salesforce.php
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ public function call_api($method, $url, $options = [], $debug_info = [])

$format = 'json';
if (isset($options['format'])) {
$format = $options['format'];
$format = strtolower($options['format']);
unset($options['format']);
}

Expand Down
Loading

0 comments on commit 37f7ee8

Please sign in to comment.