Skip to content

Commit

Permalink
Add event log
Browse files Browse the repository at this point in the history
  • Loading branch information
muellerberndt committed Dec 8, 2024
1 parent 6ed3e12 commit ec889f9
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 302 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,17 @@ The r4dar framework is designed to be easily extensible in order to allow users
Built-in features:

- LLM-powered Telegram chatbot interface
- Indexing assets from bounty platforms and contests (atm Immunefi only)
- Various search options for bounties & associated assets
- Automated diff when in-scope assets are updated on Immunefi or Github
- Automated monitoring of Github repos in scope
- On-chain monitoring via Quicknode integration
- Search active bounties, contests and associated files
- Automated monitoring of Github repos associated with bounties
- Monitoring of bounty scopes on Immunefi

Some possible extensions:

- Auto-analysis when proxy implementation in scope is upgraded (see [example](examples/proxy_contract_handler.py))
- Diff and analysis when an asset in scope is updated
- Daily prioritization targets based on EV
- Semgrep the code all bounties and conteststo find candidate bugs
- ?
- Automated diff and analysis when an asset is updated
- Prioritization targets based on daily events and EV
- Scan the codebases of all bounties and assets for bugs
- Endless possibilities...

## Running R4dar

Expand Down
41 changes: 31 additions & 10 deletions docs/customization.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ Handlers must inherit from `Handler` and implement the `handle` method. Create a

```python
# extensions/my-extension/my_custom_handler.py
from src.handlers.base import Handler, HandlerTrigger
from src.handlers.base import Handler, HandlerTrigger, HandlerResult
from src.util.logging import Logger
from typing import List
class MyCustomHandler(Handler):
"""Custom handler for project events"""
Expand All @@ -250,38 +251,58 @@ class MyCustomHandler(Handler):
HandlerTrigger.NEW_PROJECT
]
async def handle(self) -> None:
async def handle(self) -> HandlerResult:
"""Handle the event
The event context is available in self.context
Returns a HandlerResult indicating success/failure and any relevant data
"""
try:
# Get event data from context
project = self.context.get('project')
if not project:
return
return HandlerResult(success=False, data={"error": "No project in context"})
if self.trigger == HandlerTrigger.PROJECT_UPDATE:
old_project = self.context.get('old_project')
await self._handle_project_update(project, old_project)
result = await self._handle_project_update(project, old_project)
elif self.trigger == HandlerTrigger.NEW_PROJECT:
await self._handle_new_project(project)
result = await self._handle_new_project(project)
return HandlerResult(success=True, data=result)
except Exception as e:
self.logger.error(f"Error in handler: {str(e)}")
return HandlerResult(success=False, data={"error": str(e)})
async def _handle_project_update(self, project, old_project):
async def _handle_project_update(self, project, old_project) -> dict:
"""Handle project update event"""
# Your update logic here
pass
return {"status": "updated", "project_id": project.id}
async def _handle_new_project(self, project):
async def _handle_new_project(self, project) -> dict:
"""Handle new project event"""
# Your new project logic here
pass
return {"status": "created", "project_id": project.id}
### Handler Results
Handlers must return a `HandlerResult` object that indicates the success or failure of the operation and includes any relevant data:

```python
from src.handlers.base import HandlerResult
# Successful result with data
return HandlerResult(success=True, data={"status": "completed", "items_processed": 5})
# Failed result with error information
return HandlerResult(success=False, data={"error": "Failed to process project"})
```

The handler will be automatically discovered and registered when your extension is loaded. No additional registration code is needed.
The `HandlerResult` is used by the event bus to:
1. Track handler execution success/failure
2. Log handler results in the event log
3. Provide feedback to other system components

### Available Triggers

Expand Down
186 changes: 34 additions & 152 deletions extensions/examples/proxy_implementation_upgrade_handler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from src.handlers.base import Handler, HandlerTrigger
from src.handlers.base import Handler, HandlerTrigger, HandlerResult
from src.services.telegram import TelegramService
from src.util.logging import Logger
from src.backend.database import DBSessionMixin
from src.util.etherscan import EVMExplorer, ExplorerType
from src.util.etherscan import EVMExplorer
from sqlalchemy import text
from typing import Dict, Any, Optional
import json
import traceback
import aiohttp
from extensions.examples.proxy_implementation_upgrade_agent import ProxyImplementationUpgradeAgent

__all__ = ["ProxyImplementationUpgradeHandler"]
Expand All @@ -28,11 +26,11 @@ def get_triggers(cls) -> list[HandlerTrigger]:
"""Get list of triggers this handler listens for"""
return [HandlerTrigger.BLOCKCHAIN_EVENT]

async def handle(self) -> None:
async def handle(self) -> HandlerResult:
"""Handle blockchain events"""
if not self.context:
self.logger.error("No context provided")
return
return HandlerResult(success=False, error="No context provided")

try:
# Extract event data
Expand Down Expand Up @@ -66,7 +64,7 @@ async def handle(self) -> None:

if not contract_address or not implementation_address:
self.logger.debug("No proxy upgrade event found in payload")
return
return HandlerResult(success=True, data={"found_upgrade": False})

# Look up project info from database
project_info = await self._get_project_info(contract_address)
Expand All @@ -82,173 +80,57 @@ async def handle(self) -> None:
}

# Call agent to analyze implementation
await self._analyze_implementation(agent_data)
analysis_result = await self._analyze_implementation(agent_data)

return HandlerResult(
success=True,
data={
"found_upgrade": True,
"contract_address": contract_address,
"implementation_address": implementation_address,
"project": project_info,
"analysis": analysis_result,
},
)

except Exception as e:
self.logger.error(f"Error handling proxy implementation upgrade event: {str(e)}")
error_msg = f"Error handling proxy implementation upgrade event: {str(e)}"
self.logger.error(error_msg)
return HandlerResult(success=False, error=error_msg)

async def _get_project_info(self, contract_address: str) -> Optional[Dict[str, Any]]:
"""Get project info for a contract address from database"""
"""Get project info from database"""
try:
async with self.get_async_session() as session:
async with self.session() as session:
# Query for project containing this contract address
query = text(
"""
SELECT
p.id,
p.name,
p.description,
p.project_type,
p.extra_data,
a.source_url,
a.asset_type
FROM assets a
JOIN project_assets pa ON a.id = pa.asset_id
JOIN projects p ON pa.project_id = p.id
WHERE LOWER(a.source_url) LIKE LOWER(:address)
AND p.project_type = 'bounty'
SELECT p.* FROM projects p
JOIN assets a ON a.project_id = p.id
WHERE a.source_url ILIKE :contract_pattern
LIMIT 1
"""
)

result = await session.execute(query, {"address": f"%{contract_address}%"})
row = result.first()
result = await session.execute(query, {"contract_pattern": f"%{contract_address}%"})

row = result.fetchone()
if row:
# Extract bounty info from extra_data
extra_data = row.extra_data or {}
max_bounty = extra_data.get("max_bounty", 0) if isinstance(extra_data, dict) else 0

return {
"id": row.id,
"name": row.name,
"description": row.description,
"project_type": row.project_type,
"max_bounty": max_bounty,
"source_url": row.source_url,
"asset_type": row.asset_type,
"extra_data": extra_data,
}

return dict(row)
return None

except Exception as e:
self.logger.error(f"Error querying project info: {str(e)}\nTraceback: {traceback.format_exc()}")
return None

async def _fetch_source_code(self, address: str) -> Optional[str]:
"""Fetch verified source code from Etherscan
Args:
address: Contract address to fetch source for
Returns:
Source code if found, None otherwise
"""
async def _analyze_implementation(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze implementation changes using agent"""
try:
# Get API key and URL for Etherscan
api_key = self.explorer.get_api_key(ExplorerType.ETHERSCAN)
api_url = self.explorer.get_api_url(ExplorerType.ETHERSCAN)

# Construct API URL
full_api_url = f"{api_url}?module=contract&action=getsourcecode&address={address}&apikey={api_key}"

# Fetch source code
async with aiohttp.ClientSession() as session:
async with session.get(full_api_url) as response:
data = await response.json()

if data["status"] != "1":
self.logger.error(f"Etherscan API error: {data.get('message', 'Unknown error')}")
return None

# Extract source code
result = data["result"][0]
source_code = result.get("SourceCode", "")

if not source_code:
return None

try:
# Handle double-wrapped JSON (starts with {{ and ends with }})
if source_code.startswith("{{") and source_code.endswith("}}"):
# Remove the double braces
source_code = source_code[1:-1].strip()

# Parse the JSON
source_data = json.loads(source_code)

# Combine all source files into one string
combined_source = []
for filename, filedata in source_data.get("sources", {}).items():
content = filedata.get("content", "")
combined_source.append(f"// File: {filename}\n{content}\n")

return "\n".join(combined_source)

except json.JSONDecodeError:
# If not JSON, return the source code as is
return source_code

except Exception as e:
self.logger.error(f"Error fetching source code: {str(e)}\nTraceback: {traceback.format_exc()}")
return None

async def _analyze_implementation(self, data: Dict[str, Any]) -> None:
"""Analyze implementation using agent"""
try:
# Get verified source code
self.logger.info(f"Fetching source code for {data['implementation_address']}")
source_code = await self._fetch_source_code(data["implementation_address"])
if not source_code:
self.logger.error("Could not fetch verified source code")
return

self.logger.info("Initializing agent for analysis")
# Initialize agent
agent = ProxyImplementationUpgradeAgent()

# Get analysis from agent
self.logger.info("Starting implementation analysis")
analysis = await agent.analyze_implementation(source_code)
self.logger.info("Analysis complete", extra_data={"analysis": analysis})

# Format notification message
message = [
"🔄 Proxy Implementation Upgrade In Bounty Scope Detected!",
"",
f"Contract: https://etherscan.io/address/{data['contract_address']}",
f"New Implementation: https://etherscan.io/address/{data['implementation_address']}",
f"Transaction: https://etherscan.io/tx/{data['transaction_hash']}",
"",
]

# Add project info if available
if data.get("project"):
project = data["project"]
message.extend(
[
"Project Information:",
f"Name: {project['name']}",
(
f"Max Bounty: ${project['max_bounty']:,.2f}"
if project.get("max_bounty")
else "Max Bounty: Not specified"
),
f"Description: {project['description']}" if project.get("description") else "",
"",
]
)

# Format analysis results
analysis_message = agent.format_analysis(analysis)
message.append(analysis_message)

# Send notification
self.logger.info("Sending notification")
await self.telegram.send_message("\n".join(message))

return await agent.analyze(data)
except Exception as e:
self.logger.error(f"Error analyzing implementation: {str(e)}\nTraceback: {traceback.format_exc()}")
self.logger.error(f"Error analyzing implementation: {str(e)}")
return {"error": str(e)}


# Function to initialize and register the handler
Expand Down
8 changes: 0 additions & 8 deletions server.log

This file was deleted.

19 changes: 16 additions & 3 deletions src/handlers/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from typing import Dict, Any, List, Optional
from enum import Enum, auto
from dataclasses import dataclass


@dataclass
class HandlerResult:
"""Result of a handler execution"""

success: bool = True
data: Optional[Dict[str, Any]] = None


class HandlerTrigger(Enum):
Expand Down Expand Up @@ -45,8 +54,12 @@ def get_triggers(cls) -> List[HandlerTrigger]:
"""Get list of triggers this handler listens for"""

@abstractmethod
def handle(self) -> None:
"""Handle an event"""
async def handle(self) -> Optional[HandlerResult]:
"""Handle an event
Returns:
Optional[HandlerResult]: Result of the handler execution, if any
"""

def set_context(self, context: Dict[str, Any], trigger: HandlerTrigger = None) -> None:
"""Set context and trigger for this handler"""
Expand Down
Loading

0 comments on commit ec889f9

Please sign in to comment.