Skip to content

Commit

Permalink
feat: updated result service (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Nov 21, 2024
1 parent 26f9b62 commit 0bed19b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 11 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ Python SDK for Crawlab
pip install crawlab-sdk
```

## Usage

### CLI

```bash
crawlab-cli
```

### Scrapy Integration

In `settings.py`, add the following:

```python
ITEM_PIPELINES = {
'crawlab.CrawlabPipeline': 300
}
```

## Development

### Install dependencies
Expand Down
9 changes: 8 additions & 1 deletion crawlab/config/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import os
import re
from typing import Optional


def get_task_id() -> Optional[str]:
return os.getenv('CRAWLAB_TASK_ID')
task_id = os.getenv('CRAWLAB_TASK_ID')

# Only allow ObjectId format
if task_id and re.match(r'^[a-fA-F0-9]{24}$', task_id):
return task_id
else:
return None
10 changes: 2 additions & 8 deletions crawlab/result.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from typing import List, Optional, Dict

from crawlab.client import get_client, Client
Expand Down Expand Up @@ -45,19 +44,14 @@ def _save(self, items: List[Dict]):
result.set_task_id(tid)
records.append(result)

data = json.dumps(
{
"task_id": tid,
"data": records,
}
).encode("utf-8")

# msg = stream_message_pb2.StreamMessage(
# code=stream_message_code_pb2.INSERT_DATA,
# data=data,
# )
# self.task_stub.Subscribe(iter([msg]))

# TODO: Use IPC to send data


RS: Optional[ResultService] = None

Expand Down
17 changes: 16 additions & 1 deletion crawlab/scrapy/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,24 @@


class CrawlabPipeline(object):
"""Pipeline for saving scraped items to Crawlab's database."""

def process_item(self, item, spider):
# Get the task ID from the current crawling task
task_id = get_task_id()

# Skip processing if no task ID is available
if not task_id:
return item

# Convert the scraped item to a Crawlab Result object
result = Result(item)
result.set_task_id(get_task_id())

# Associate the result with the current task
result.set_task_id(task_id)

# Save the result to Crawlab's database
save_item(result)

# Return the item for potential further processing by other pipelines
return item
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ packages = [
repository = "https://github.com/crawlab-team/crawlab-python-sdk"

[tool.poetry.dependencies]
python = "^3.11"
python = "^3.9"
grpcio = "^1.68.0"
grpcio-tools = "^1.59.0"
httpx = "^0.23.0"
Expand Down

0 comments on commit 0bed19b

Please sign in to comment.