Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agentstack/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,12 @@ class ValidationError(Exception):
"""

pass


class ToolError(Exception):
"""
Base exception for all tool-related errors. All exceptions inside of tool
implementations should inherit from this class.
"""

pass
216 changes: 216 additions & 0 deletions agentstack/templates/crewai/tools/pipedream_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
from typing import Optional, Dict, Any
from crewai_tools import BaseTool
from pydantic import Field
import os
import requests
from json import JSONDecodeError
from agentstack.exceptions import ToolError

# TODO: Future Enhancements
# - Add support for workflow-specific operations (create/update/delete)
# - Implement workflow creation with component chaining
# - Add workflow update capabilities
# - Support workflow deletion and cleanup
#
# - Implement webhook management capabilities
# - Add webhook creation and configuration
# - Support webhook event filtering
# - Implement webhook deletion and updates
#
# - Add component version control integration
# - Support component versioning
# - Add version rollback capabilities
# - Implement version comparison
#
# - Support custom component deployment
# - Add custom component creation
# - Support component testing and validation
# - Implement component publishing

class PipedreamToolError(ToolError):
"""Exception raised for Pipedream-specific tool errors."""
pass


class PipedreamClient:
"""Client for interacting with Pipedream API"""
def __init__(self, api_key: str):
self.base_url = "https://api.pipedream.com/v1/connect"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}

def list_apps(self, query: str = None) -> dict:
"""List available Pipedream apps"""
params = {"q": query} if query else {}
return self._request("GET", "/apps", params=params)

def list_components(self, app: str) -> dict:
"""List available components for an app"""
return self._request("GET", f"/actions?app={app}")

def get_component_definition(self, key: str) -> dict:
"""Get component definition and props"""
return self._request("GET", f"/components/{key}")

def run_action(self, component_id: str, inputs: Dict[str, Any]) -> dict:
"""Execute a Pipedream component action"""
return self._request("POST", "/actions/run", json={
"id": component_id,
"configured_props": inputs
})

def deploy_source(self, component_id: str, webhook_url: str, config: Dict[str, Any]) -> dict:
"""Deploy a Pipedream component source"""
return self._request("POST", "/triggers/deploy", json={
"id": component_id,
"webhook_url": webhook_url,
"configured_props": config
})

def _request(self, method: str, path: str, **kwargs) -> dict:
"""Make request to Pipedream API"""
response = requests.request(method, f"{self.base_url}{path}",
headers=self.headers, **kwargs)
if not response.ok:
raise PipedreamToolError(f"API request failed: {response.text}")
try:
return response.json()
except JSONDecodeError:
raise PipedreamToolError("Invalid JSON response from Pipedream API")


class PipedreamListAppsTool(BaseTool):
name: str = "List Pipedream Apps"
description: str = "List available Pipedream apps with optional search query"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)

model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}

def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)

def _run(self, query: str = None) -> str:
"""List available Pipedream apps with optional search query"""
try:
return self.client.list_apps(query)["data"]
except Exception as e:
raise PipedreamToolError(f"Failed to list apps: {str(e)}")


class PipedreamListComponentsTool(BaseTool):
name: str = "List Pipedream Components"
description: str = "List available components for a Pipedream app"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)

model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}

def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)

def _run(self, app: str) -> str:
"""List available components for the specified app"""
try:
return self.client.list_components(app)["data"]
except Exception as e:
raise PipedreamToolError(f"Failed to list components: {str(e)}")


class PipedreamGetPropsTool(BaseTool):
name: str = "Get Pipedream Component Properties"
description: str = "Get component definition and configuration options"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)

model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}

def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)

def _run(self, key: str) -> str:
"""Get component definition and configuration options"""
try:
return self.client.get_component_definition(key)["data"]
except Exception as e:
raise PipedreamToolError(f"Failed to get component properties: {str(e)}")


class PipedreamActionTool(BaseTool):
name: str = "Execute Pipedream Action"
description: str = "Execute a Pipedream component action with specified inputs"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)

model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}

def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)

def _run(self, component_id: str, inputs: Dict[str, Any]) -> str:
"""
Execute a Pipedream component action.

Args:
component_id: The ID of the Pipedream component to execute
inputs: Dictionary of input parameters for the component

Returns:
str: JSON response from the component execution

Raises:
PipedreamToolError: If the API request fails or returns an error
"""
try:
return self.client.run_action(component_id, inputs)
except Exception as e:
raise PipedreamToolError(f"Failed to execute action: {str(e)}")


class PipedreamSourceTool(BaseTool):
name: str = "Deploy Pipedream Source"
description: str = "Deploy a Pipedream source component with webhook configuration"
client: Optional[PipedreamClient] = Field(default=None, exclude=True)

model_config = {
"arbitrary_types_allowed": True,
"extra": "allow"
}

def __init__(self, api_key: str):
super().__init__()
self.client = PipedreamClient(api_key)

def _run(self, component_id: str, webhook_url: str, config: Dict[str, Any]) -> str:
"""
Deploy a Pipedream component source.

Args:
component_id: The ID of the Pipedream component to deploy
webhook_url: The URL where events will be sent
config: Dictionary of configuration parameters for the component

Returns:
str: JSON response from the component deployment

Raises:
PipedreamToolError: If the API request fails or returns an error
"""
try:
return self.client.deploy_source(component_id, webhook_url, config)
except Exception as e:
raise PipedreamToolError(f"Failed to deploy source: {str(e)}")
18 changes: 18 additions & 0 deletions agentstack/tools/pipedream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "pipedream",
"category": "api-integration",
"description": "Execute Pipedream component actions and deploy source components",
"tools": [
"PipedreamListAppsTool",
"PipedreamListComponentsTool",
"PipedreamGetPropsTool",
"PipedreamActionTool",
"PipedreamSourceTool"
],
"url": "https://pipedream.com/docs/connect/components",
"cta": "Get your Pipedream API key at https://pipedream.com/settings/api-key",
"env": {
"PIPEDREAM_API_KEY": "your-pipedream-api-key"
},
"packages": ["requests"]
}
5 changes: 4 additions & 1 deletion docs/tools/core.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ description: 'AgentStack tools that are not third-party integrations'
## Data Input
- [Vision](/tools/tool/vision)

## API Integration
- [Pipedream Components](/tools/tool/pipedream)

<CardGroup cols={1}>
<Card
title="Community Tools"
Expand All @@ -24,4 +27,4 @@ description: 'AgentStack tools that are not third-party integrations'
>
Third party tools from the Agent Community
</Card>
</CardGroup>
</CardGroup>
123 changes: 123 additions & 0 deletions docs/tools/tool/pipedream.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
---
title: 'Pipedream Components'
description: 'Execute pre-built Pipedream components in your agent projects'
---

# Pipedream Components

The Pipedream integration allows you to use pre-built components from Pipedream's extensive library directly in your agent projects. This integration supports both actions (synchronous operations) and sources (event-driven triggers).

## Setup

1. Install the Pipedream tool:
```bash
agentstack tools add pipedream
```

2. Configure your API key:
- Get your API key from [Pipedream Settings](https://pipedream.com/settings/api-key)
- Add it to your .env file:
```
PIPEDREAM_API_KEY=your-pipedream-api-key
```

## Component Props

Props are predefined configuration parameters in Pipedream components. They:
- Are defined in the component code
- Cannot be created by end users
- Support both static and dynamic configurations
- Are configured through specific API endpoints

Users can configure prop values but cannot create new props. Props must be defined by component developers following Pipedream's component structure.

## Component Types

Pipedream components come in two types:

### Actions
- **What are they?** Synchronous operations that execute immediately and return a result
- **Use cases:** Making API calls, processing data, performing one-time operations
- **Tool to use:** `PipedreamActionTool`
- **Example usage:**
```yaml
# config/agents.yaml
agents:
- name: PipedreamAgent
role: API Integration Specialist
goal: Execute Pipedream components and handle responses
tools:
- PipedreamActionTool

# config/tasks.yaml
tasks:
- name: ExecutePipedreamAction
agent: PipedreamAgent
input: |
Execute the Pipedream component action with:
- Component ID: "gitlab-list-commits"
- Input parameters:
- projectId: 45672541
- refName: "main"
```

### Sources
- **What are they?** Event-driven triggers that listen for events and execute when triggered
- **Use cases:** Webhooks, scheduled tasks, real-time data processing
- **Tool to use:** `PipedreamSourceTool`
- **Requirements:** Public webhook URL for receiving events
- **Example usage:**
```yaml
# config/agents.yaml
agents:
- name: PipedreamAgent
role: API Integration Specialist
goal: Deploy and manage Pipedream source components
tools:
- PipedreamSourceTool

# config/tasks.yaml
tasks:
- name: DeployPipedreamSource
agent: PipedreamAgent
input: |
Deploy a Pipedream source component with:
- Component ID: "gitlab-new-issue"
- Webhook URL: "https://events.example.com/gitlab-new-issue"
- Configuration:
- projectId: 45672541
```

## Error Handling

The Pipedream tools include comprehensive error handling:

- API authentication errors (missing or invalid API key)
- Network request failures
- Invalid JSON responses
- Component-specific errors

Error messages are descriptive and include troubleshooting hints.

## Best Practices

1. **API Key Security**
- Never commit your API key to version control
- Use environment variables for sensitive data
- Include placeholder in .env.example

2. **Component Selection**
- Choose appropriate component type (action vs source)
- Test components individually before integration
- Review component documentation for required parameters

3. **Error Handling**
- Always handle potential errors in your agent tasks
- Validate inputs before execution
- Monitor source component health

## Limitations

- Pipedream Connect is currently in beta
- Some components may require additional authentication
- Source components need a publicly accessible webhook URL
7 changes: 7 additions & 0 deletions examples/pipedream_example/src/config/agents.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
agents:
- name: PipedreamAgent
role: API Integration Specialist
goal: Execute Pipedream components and handle responses
tools:
- PipedreamActionTool
- PipedreamSourceTool
Loading
Loading