Skip to content

Commit

Permalink
feat: queue shopify chunks as you go
Browse files Browse the repository at this point in the history
  • Loading branch information
cdxker authored and skeptrunedev committed Nov 11, 2024
1 parent 807ff8c commit 247decc
Showing 1 changed file with 108 additions and 74 deletions.
182 changes: 108 additions & 74 deletions server/src/bin/crawl-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,51 +167,6 @@ fn create_chunk_req_payload(
})
}

async fn get_chunks_from_shopify(
scrape_request: CrawlRequest,
) -> Result<(Vec<ChunkReqPayload>, usize), ServiceError> {
let mut chunks: Vec<ChunkReqPayload> = Vec::new();
let mut cur_page = 1;

loop {
let url = format!("{}/products.json?page={}", scrape_request.url, cur_page);
let response: ShopifyResponse = ureq::get(&url)
.call()
.map_err(|e| ServiceError::InternalServerError(format!("Failed to fetch: {}", e)))?
.into_json()
.map_err(|e| {
ServiceError::InternalServerError(format!("Failed to parse JSON: {}", e))
})?;
if response.products.is_empty() {
break;
}

for product in response.products {
if product.variants.len() == 1 {
chunks.push(create_chunk_req_payload(
&product,
&product.variants[0],
&scrape_request.url,
&scrape_request,
)?);
} else {
for variant in &product.variants {
chunks.push(create_chunk_req_payload(
&product,
variant,
&scrape_request.url,
&scrape_request,
)?);
}
}
}

cur_page += 1;
}

Ok((chunks, cur_page))
}

#[allow(clippy::print_stdout)]
async fn get_chunks_with_firecrawl(
scrape_request: CrawlRequest,
Expand Down Expand Up @@ -599,42 +554,121 @@ async fn crawl(

let dataset_config = DatasetConfiguration::from_json(dataset.server_configuration.clone());

// Use shopify specific logic to get chunks or firecrawl
let (chunks, page_count) = match scrape_request.crawl_options.scrape_options {
Some(ScrapeOptions::Shopify(_)) => get_chunks_from_shopify(scrape_request.clone()).await?,
_ => get_chunks_with_firecrawl(scrape_request.clone(), pool.clone()).await?,
};
let (page_count, chunks_created) = if let Some(ScrapeOptions::Shopify(_)) =
scrape_request.crawl_options.scrape_options
{
let mut cur_page = 1;
let mut chunk_count = 0;

loop {
let mut chunks: Vec<ChunkReqPayload> = Vec::new();
let url = format!("{}/products.json?page={}", scrape_request.url, cur_page);
let response: ShopifyResponse = ureq::get(&url)
.call()
.map_err(|e| ServiceError::InternalServerError(format!("Failed to fetch: {}", e)))?
.into_json()
.map_err(|e| {
ServiceError::InternalServerError(format!("Failed to parse JSON: {}", e))
})?;
if response.products.is_empty() {
break;
}

for product in response.products {
if product.variants.len() == 1 {
chunks.push(create_chunk_req_payload(
&product,
&product.variants[0],
&scrape_request.url,
&scrape_request,
)?);
} else {
for variant in &product.variants {
chunks.push(create_chunk_req_payload(
&product,
variant,
&scrape_request.url,
&scrape_request,
)?);
}
}
}

let chunks_to_upload = chunks.chunks(120);
let chunks_to_upload = chunks.chunks(120);

for chunk in chunks_to_upload {
let (chunk_ingestion_message, chunk_metadatas) = create_chunk_metadata(
chunk.to_vec(),
scrape_request.dataset_id,
dataset_config.clone(),
pool.clone(),
)
.await?;
for chunk in chunks_to_upload {
let (chunk_ingestion_message, chunk_metadatas) = create_chunk_metadata(
chunk.to_vec(),
scrape_request.dataset_id,
dataset_config.clone(),
pool.clone(),
)
.await?;

let mut redis_conn = redis_pool
.get()
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
let mut redis_conn = redis_pool
.get()
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

if !chunk_metadatas.is_empty() {
let serialized_message: String = serde_json::to_string(&chunk_ingestion_message)
.map_err(|_| {
ServiceError::BadRequest("Failed to Serialize BulkUploadMessage".to_string())
})?;
if !chunk_metadatas.is_empty() {
let serialized_message: String =
serde_json::to_string(&chunk_ingestion_message).map_err(|_| {
ServiceError::BadRequest(
"Failed to Serialize BulkUploadMessage".to_string(),
)
})?;

redis::cmd("lpush")
.arg("ingestion")
.arg(&serialized_message)
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
}
}

chunk_count += chunks.len();
cur_page += 1;
}

(cur_page, chunk_count)
} else {
let (chunks, page_count) =
get_chunks_with_firecrawl(scrape_request.clone(), pool.clone()).await?;
let chunks_to_upload = chunks.chunks(120);

for chunk in chunks_to_upload {
let (chunk_ingestion_message, chunk_metadatas) = create_chunk_metadata(
chunk.to_vec(),
scrape_request.dataset_id,
dataset_config.clone(),
pool.clone(),
)
.await?;

redis::cmd("lpush")
.arg("ingestion")
.arg(&serialized_message)
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_conn)
let mut redis_conn = redis_pool
.get()
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;

if !chunk_metadatas.is_empty() {
let serialized_message: String = serde_json::to_string(&chunk_ingestion_message)
.map_err(|_| {
ServiceError::BadRequest(
"Failed to Serialize BulkUploadMessage".to_string(),
)
})?;

redis::cmd("lpush")
.arg("ingestion")
.arg(&serialized_message)
.query_async::<redis::aio::MultiplexedConnection, usize>(&mut *redis_conn)
.await
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
}
}
}
(page_count, chunks.len())
};

update_crawl_status(
scrape_request.scrape_id,
Expand All @@ -653,7 +687,7 @@ async fn crawl(
Ok(ScrapeReport {
request_id: scrape_request.id,
pages_scraped: page_count,
chunks_created: chunks.len(),
chunks_created,
})
}

Expand Down

0 comments on commit 247decc

Please sign in to comment.