Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR for fdt622 #701

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 209 additions & 1 deletion agent/crash_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,216 @@
"""An LLM agent to analyze and provide insight of a fuzz target's runtime crash.
Use it as a usual module locally, or as script in cloud builds.
"""
import os
import random
import shutil
import time
from typing import Optional

import logger
from agent.base_agent import BaseAgent
from experiment import evaluator as evaluator_lib
from experiment import oss_fuzz_checkout
from llm_toolkit.prompt_builder import DefaultTemplateBuilder
from llm_toolkit.prompts import Prompt
from results import CrashResult, Result, RunResult
from tool.lldb_tool import LLDBTool

MAX_ROUND = 100


class CrashAnalyzer(BaseAgent):
pass
"""The Agent to analyze a runtime crash and provide insight to fuzz target."""

def _initial_prompt(self, results: list[Result]) -> Prompt:
"""Constructs initial prompt of the agent."""
last_result = results[-1]

if isinstance(last_result, RunResult):
default_prompt_builder = DefaultTemplateBuilder(
model=self.llm, benchmark=last_result.benchmark)
prompt = default_prompt_builder.build_triager_prompt(
last_result.benchmark, last_result.fuzz_target_source,
last_result.run_error, last_result.crash_func)
return prompt

logger.error("Expected a RunResult object in results list")
return DefaultTemplateBuilder(self.llm).build([])

def _create_ossfuzz_project_with_lldb(self,
name: str,
target_file: str,
run_result: RunResult,
build_script_path: str = '') -> str:
"""Creates an OSS-Fuzz project with new dockerfile and fuzz target.
The new project will replicate an existing project |name| but modify
its dockerfile."""
logger.info('target file: %s', target_file)
generated_project_path = os.path.join(oss_fuzz_checkout.OSS_FUZZ_DIR,
'projects', name)
if os.path.exists(generated_project_path):
logger.info('Project %s already exists.', generated_project_path)
return name

existing_project_path = os.path.join(oss_fuzz_checkout.OSS_FUZZ_DIR,
'projects',
run_result.benchmark.project)

shutil.copytree(existing_project_path, generated_project_path)

# Copy generated fuzzers to generated_project_path
shutil.copyfile(
target_file,
os.path.join(generated_project_path, os.path.basename(target_file)))

if not build_script_path or os.path.getsize(build_script_path) == 0:
# Add additional statement in dockerfile to enable -g and install lldb.
with open(os.path.join(generated_project_path, 'Dockerfile'), 'a') as f:
f.write('\nENV FUZZING_LANGUAGE={run_result.benchmark.language}\n'
'\nRUN sed -i.bak \'1i export CFLAGS="${CFLAGS} -g"\' '
'/src/build.sh\n'
'\nRUN apt-get update && apt-get install -y lldb\n')
return name

# Copy generated build script to generated_project_path
shutil.copyfile(
build_script_path,
os.path.join(generated_project_path,
os.path.basename('agent-build.sh')))

# Add additional statement in dockerfile to overwrite with \
# generated fuzzer, enable -g and install lldb
with open(os.path.join(generated_project_path, 'Dockerfile'), 'a') as f:
f.write(
'\nCOPY agent-build.sh /src/build.sh\n'
'\nENV FUZZING_LANGUAGE={run_result.benchmark.language}\n'
'\nRUN sed -i.bak \'1i export CFLAGS="${CFLAGS} -g"\' /src/build.sh\n'
'\nRUN apt-get update && apt-get install -y lldb\n')

return name

def _sleep_random_duration(self, min_sec: int = 1, max_sec: int = 60) -> None:
"""Sleeps for a random duration between min_sec and max_sec. Agents uses
this to avoid exceeding quota limit (e.g., LLM query frequency)."""
duration = random.randint(min_sec, max_sec)
logger.debug('Sleeping for %d before the next query', duration)
time.sleep(duration)

def _handle_conclusion(self, cur_round: int, response: str,
crash_result: CrashResult):
"""Parses LLM conclusion, analysis and suggestion."""
logger.info('----- ROUND %02d Received conclusion -----', cur_round)

conclusion = self._parse_tag(response, 'conclusion')
if conclusion == 'Crash is caused by bug in fuzz driver.':
crash_result.true_bug = False
elif conclusion == 'Crash is caused by bug in project.':
crash_result.true_bug = True
else:
logger.error('***** Failed to match conclusion in %02d rounds *****',
cur_round)

crash_result.insight = self._parse_tag(response, 'analysis and suggestion')
if not crash_result.insight:
logger.error('Round %02d No analysis and suggestion in conclusion: %s',
cur_round, response)

def _container_tool_reaction(self, cur_round: int, response: str,
crash_result: CrashResult) -> Optional[Prompt]:
"""Validates LLM conclusion or executes its command."""
if self._parse_tag(response, 'conclusion'):
return self._handle_conclusion(cur_round, response, crash_result)
return self._container_handle_bash_command(cur_round, response,
self.analyze_tool)

def execute(self, result_history: list[Result]) -> CrashResult:
"""Executes the agent based on previous run result."""
logger.info('Executing Crash Analyzer')
last_result = result_history[-1]
benchmark = last_result.benchmark
trial = last_result.trial
if isinstance(last_result, RunResult):
generated_target_name = os.path.basename(benchmark.target_path)
sample_id = os.path.splitext(generated_target_name)[0]
generated_oss_fuzz_project = (
f'{benchmark.id}-{sample_id}-{trial:02d}-lldb')
generated_oss_fuzz_project = evaluator_lib.rectify_docker_tag(
generated_oss_fuzz_project)

fuzz_target_path = os.path.join(last_result.work_dirs.fuzz_targets,
f'{trial:02d}.fuzz_target')
build_script_path = os.path.join(last_result.work_dirs.fuzz_targets,
f'{trial:02d}.build_script')

self._create_ossfuzz_project_with_lldb(generated_oss_fuzz_project,
fuzz_target_path, last_result,
build_script_path)

self.analyze_tool = LLDBTool(
benchmark,
name='lldb',
project=generated_oss_fuzz_project,
result=last_result,
)
self.analyze_tool.execute('compile > /dev/null')
prompt = self._initial_prompt(result_history)
prompt.add_problem(self.analyze_tool.tutorial())
crash_result = CrashResult(
benchmark=benchmark,
trial=trial,
work_dirs=last_result.work_dirs,
compiles=last_result.compiles,
compile_error=last_result.compile_error,
compile_log=last_result.compile_log,
crashes=last_result.crashes,
run_error=last_result.run_error,
crash_func=last_result.crash_func,
run_log=last_result.run_log,
coverage_summary=last_result.coverage_summary,
coverage=last_result.coverage,
line_coverage_diff=last_result.line_coverage_diff,
textcov_diff=last_result.textcov_diff,
reproducer_path=last_result.reproducer_path,
artifact_path=last_result.artifact_path,
artifact_name=last_result.artifact_name,
sanitizer=last_result.sanitizer,
log_path=last_result.log_path,
corpus_path=last_result.corpus_path,
coverage_report_path=last_result.coverage_report_path,
cov_pcs=last_result.cov_pcs,
total_pcs=last_result.total_pcs,
fuzz_target_source=last_result.fuzz_target_source,
build_script_source=last_result.build_script_source,
author=self,
chat_history=last_result.chat_history)
cur_round = 1
try:
client = self.llm.get_chat_client(model=self.llm.get_model())
while prompt and cur_round < MAX_ROUND:
logger.info('CrashAnalyzer ROUND %02d agent prompt: %s', cur_round,
prompt.get())
response = self.llm.chat_llm(client=client, prompt=prompt)
logger.debug('CrashAnalyzer ROUND %02d LLM response: %s', cur_round,
response)
prompt = self._container_tool_reaction(cur_round, response,
crash_result)
cur_round += 1
self._sleep_random_duration()
finally:
# Cleanup: stop the container
logger.debug('Stopping the crash analyze container %s',
self.analyze_tool.container_id)
self.analyze_tool.terminate()

return crash_result

logger.error("Expected a RunResult object in results list")
crash_result = CrashResult(
benchmark=benchmark,
trial=trial,
work_dirs=last_result.work_dirs,
fuzz_target_source=last_result.fuzz_target_source,
build_script_source=last_result.build_script_source,
author=self,
chat_history=last_result.chat_history)
return crash_result
9 changes: 7 additions & 2 deletions agent/prototyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def execute(self, result_history: list[Result]) -> BuildResult:
self.inspect_tool = ProjectContainerTool(benchmark, name='inspect')
self.inspect_tool.compile(extra_commands=' && rm -rf /out/* > /dev/null')
cur_round = 1
prompt.append(self.inspect_tool.tutorial())
prompt.add_problem(self.inspect_tool.tutorial())
build_result = BuildResult(benchmark=benchmark,
trial=last_result.trial,
work_dirs=last_result.work_dirs,
Expand All @@ -213,7 +213,11 @@ def execute(self, result_history: list[Result]) -> BuildResult:
try:
client = self.llm.get_chat_client(model=self.llm.get_model())
while prompt and cur_round < MAX_ROUND:
response = self.chat_llm(cur_round, client=client, prompt=prompt)
logger.info('Prototyper ROUND %02d agent prompt: %s', cur_round,
prompt.get())
response = self.llm.chat_llm(client=client, prompt=prompt)
logger.debug('Prototyper ROUND %02d LLM response: %s', cur_round,
response)
prompt = self._container_tool_reaction(cur_round, response,
build_result)
cur_round += 1
Expand All @@ -222,4 +226,5 @@ def execute(self, result_history: list[Result]) -> BuildResult:
logger.debug('Stopping and removing the inspect container %s',
self.inspect_tool.container_id)
self.inspect_tool.terminate()

return build_result
2 changes: 1 addition & 1 deletion common/cloud_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def run(self, agent: BaseAgent, result_history: list[Result],

cloud_build_log += self._get_build_log(build_id)

# Step 4: Deserialize dilld file.
# Step 5: Deserialize dilld file.
result = utils.deserialize_from_dill(new_result_dill)
if not result:
cloud_build_log += f'Failed to deserialize from dill {new_result_dill}.\n'
Expand Down
Loading