Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/pr_af/hitl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
ACTION_POST,
ACTION_REJECT,
ACTION_RERUN,
HAX_REVIEW_TEMPLATE,
ReviewDecision,
build_review_form,
build_review_payload,
clean_intent,
parse_review_decision,
request_review_approval,
)
Expand All @@ -27,10 +29,12 @@
"ACTION_POST",
"ACTION_REJECT",
"ACTION_RERUN",
"HAX_REVIEW_TEMPLATE",
"ReviewDecision",
"approval_webhook_url",
"build_hax_client_from_env",
"build_review_form",
"build_review_payload",
"clean_intent",
"create_hax_form_request_with_timeout",
"extract_values_from_raw",
"parse_review_decision",
Expand Down
14 changes: 9 additions & 5 deletions src/pr_af/hitl/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ async def create_hax_form_request_with_timeout(
*,
app: Any,
hax_client: HaxClient,
form: Any,
payload: dict[str, Any],
request_type: str,
title: str,
description: str | None,
expires_in_seconds: int,
Expand All @@ -72,21 +73,24 @@ async def create_hax_form_request_with_timeout(
metadata: dict[str, Any] | None,
timeout_seconds: float = HAX_CREATE_REQUEST_TIMEOUT_SECONDS,
) -> Any:
"""Submit a hax form-builder request with a hard timeout.
"""Submit a hax request of ``request_type`` with a hard timeout.

Runs the synchronous ``hax_client.create_request`` in a worker thread under
``asyncio.wait_for`` so a wedged hax-sdk fails fast (``RuntimeError``)
instead of silently burning the reasoner's active-time budget. Returns the
``CreatedRequest``; the caller passes ``.id`` / ``.url`` to ``app.pause``.

``payload`` must satisfy the registered hax template's schema — the hax
service validates it server-side and rejects unknown ``request_type`` values.
"""
app.note(
f"hitl: submitting hax form request ({title!r})",
f"hitl: submitting hax request ({request_type}: {title!r})",
tags=["hitl", "hax", "create_request"],
)

kwargs: dict[str, Any] = {
"type": "form-builder",
"payload": form.to_payload(),
"type": request_type,
"payload": payload,
"title": title,
"expires_in_seconds": expires_in_seconds,
}
Expand Down
200 changes: 101 additions & 99 deletions src/pr_af/hitl/review_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import annotations

import re
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

Expand All @@ -28,18 +29,26 @@
from ..schemas.output import ScoredFinding


# Action chosen by the reviewer in the form's "action" radio.
# The hax template this gate renders into. Registered in hax-sdk under
# src/templates/pr-af-review (id "pr-af-review-v1"). The template emits the same
# ``{template, values: {...}}`` response envelope form-builder uses, so the
# decision parser below is template-agnostic.
HAX_REVIEW_TEMPLATE = "pr-af-review-v1"

# Action chosen by the reviewer in the template's action buttons. These string
# values are the contract with the hax template's response `values.action`.
ACTION_POST = "post_selected"
ACTION_RERUN = "rerun"
ACTION_REJECT = "reject"
_VALID_ACTIONS = {ACTION_POST, ACTION_RERUN, ACTION_REJECT}

_SEVERITY_EMOJI = {
"critical": "🔴",
"important": "🟠",
"suggestion": "🔵",
"nitpick": "⚪",
}
# Longest PR-intent blurb shown in the form. The raw PR body (e.g. a Dependabot
# changelog) is stripped of HTML and truncated to this so the form stays legible.
_MAX_INTENT_CHARS = 700

_HTML_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"[ \t]+")
_BLANKLINES_RE = re.compile(r"\n{3,}")


@dataclass
Expand All @@ -65,101 +74,91 @@ def is_reject(self) -> bool:
return self.action == ACTION_REJECT


def _finding_option(finding: ScoredFinding) -> dict[str, str]:
"""One checkbox option per finding: ``id`` is the value, label is human."""
emoji = _SEVERITY_EMOJI.get(finding.severity, "•")
loc = finding.file_path or "(no file)"
if finding.line_start and finding.line_start > 0:
loc = f"{loc}:{finding.line_start}"
return {
"value": finding.id,
"label": f"{emoji} [{finding.severity}] {loc} — {finding.title}",
}


def _build_description(
pr_intent: str,
findings: list[ScoredFinding],
revision_iter: int,
revision_history: list[str],
) -> str:
"""Markdown blurb shown above the form: PR intent + what's being asked."""
counts: dict[str, int] = {}
for f in findings:
counts[f.severity] = counts.get(f.severity, 0) + 1
count_str = ", ".join(f"{n} {sev}" for sev, n in counts.items()) or "no findings"
def clean_intent(text: str | None, max_chars: int = _MAX_INTENT_CHARS) -> str:
"""Turn a raw PR body into a short, legible intent blurb.

lines = []
if pr_intent:
lines.append("**PR intent:** " + pr_intent.strip())
lines.append("")
lines.append(
f"PR-AF found **{len(findings)}** finding(s) ({count_str}). "
"Check the ones to post, or request a re-review with instructions."
)
if revision_iter > 0:
lines.append("")
lines.append(f"_Revision round {revision_iter}._")
if revision_history:
lines.append("")
lines.append("Prior instructions:")
for idx, instr in enumerate(revision_history, start=1):
if instr:
lines.append(f"{idx}. {instr}")
return "\n".join(lines)


def build_review_form(
PR bodies (especially bot-authored ones like Dependabot) are often a wall of
HTML — ``<details>``, ``<a href>``, changelog tables. The hax template
renders the intent as markdown, where raw HTML shows up literally, so we
strip tags, collapse whitespace, and truncate.
"""
if not text:
return ""
stripped = _HTML_TAG_RE.sub(" ", text)
stripped = _WS_RE.sub(" ", stripped)
stripped = _BLANKLINES_RE.sub("\n\n", stripped)
stripped = "\n".join(line.strip() for line in stripped.splitlines())
stripped = stripped.strip()
if len(stripped) > max_chars:
stripped = stripped[:max_chars].rstrip() + "…"
return stripped


def _finding_payload(finding: ScoredFinding) -> dict[str, Any]:
"""One finding entry for the hax template (camelCase per its zod schema)."""
entry: dict[str, Any] = {
"id": finding.id,
"severity": finding.severity,
"title": finding.title,
# All findings start checked, matching the prior "submit posts all" default.
"defaultSelected": True,
}
if finding.file_path:
entry["filePath"] = finding.file_path
if finding.line_start and finding.line_start > 0:
entry["lineStart"] = finding.line_start
if finding.line_end and finding.line_end > 0:
entry["lineEnd"] = finding.line_end
if finding.body:
entry["body"] = finding.body
if finding.suggestion:
entry["suggestion"] = finding.suggestion
if finding.dimension_name:
entry["dimension"] = finding.dimension_name
if finding.confidence is not None:
entry["confidence"] = finding.confidence
return entry


def build_review_payload(
*,
pr_intent: str,
findings: list[ScoredFinding],
title: str,
pr_meta: dict[str, Any] | None = None,
revision_iter: int = 0,
revision_history: list[str] | None = None,
) -> Any:
"""Translate the review into a ``hax.FormBuilder`` (imported lazily)."""
from hax import FormBuilder

all_ids = [f.id for f in findings]
options = [_finding_option(f) for f in findings]

form = (
FormBuilder()
.title(title)
.description(
_build_description(pr_intent, findings, revision_iter, revision_history or [])
)
.submit_label("Submit decision")
)
) -> dict[str, Any]:
"""Build the ``pr-af-review-v1`` request payload (validated server-side).

# checkbox_group requires at least one option; skip it when there are no
# findings (an empty review still lets the reviewer approve/reject).
if options:
form.checkbox_group(
"findings_to_post",
label="Findings to post",
description="Only the checked findings are posted to the PR.",
options=options,
default_value=all_ids,
)
Shape matches ``prAfReviewPayloadSchema`` in the hax-sdk template.
"""
counts: dict[str, int] = {}
for f in findings:
counts[f.severity] = counts.get(f.severity, 0) + 1
count_str = ", ".join(f"{n} {sev}" for sev, n in counts.items()) or "no findings"

form.radio_group(
"action",
label="Action",
options=[
{"value": ACTION_POST, "label": "Post selected findings"},
{"value": ACTION_RERUN, "label": "Re-review with instructions below"},
{"value": ACTION_REJECT, "label": "Reject — post nothing"},
],
default_value=ACTION_POST,
)
form.textarea(
"instructions",
label="Re-review instructions (optional)",
description="Used when action is 'Re-review'. E.g. 'too aggressive, tone it down'.",
required=False,
)
return form
payload: dict[str, Any] = {
"title": title,
"intent": clean_intent(pr_intent),
"reviewSummary": f"PR-AF found {len(findings)} finding(s) ({count_str}).",
"findings": [_finding_payload(f) for f in findings],
"postLabel": "Post selected",
"rerunLabel": "Re-review with instructions",
"rejectLabel": "Reject",
"instructionsPlaceholder": "e.g. too aggressive, tone it down and drop the nitpicks",
}
if pr_meta:
# Drop empties so optional zod fields stay absent rather than "".
cleaned = {k: v for k, v in pr_meta.items() if v not in (None, "")}
if cleaned:
payload["pr"] = cleaned
if revision_iter > 0 or revision_history:
payload["revision"] = {
"iteration": revision_iter,
"priorInstructions": [i for i in (revision_history or []) if i and i.strip()],
}
return payload


def _coerce_str(value: Any) -> str:
Expand Down Expand Up @@ -229,11 +228,12 @@ async def request_review_approval(
webhook_url: str | None,
user_id: str | None,
expires_in_hours: int,
pr_meta: dict[str, Any] | None = None,
revision_iter: int = 0,
revision_history: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> ReviewDecision:
"""Build the form, create the hax request, pause, return the decision.
"""Build the payload, create the hax request, pause, return the decision.

Any failure to create the request or pause is surfaced as a reject so the
pipeline never posts an unreviewed review when the gate is enabled.
Expand All @@ -245,25 +245,27 @@ async def request_review_approval(
title = f"{title} — {pr_label}"

try:
form = build_review_form(
payload = build_review_payload(
pr_intent=pr_intent,
findings=findings,
title=title,
pr_meta=pr_meta,
revision_iter=revision_iter,
revision_history=revision_history,
)
except Exception as exc:
app.note(
f"hitl: failed to build review form: {exc}",
tags=["hitl", "form", "error"],
f"hitl: failed to build review payload: {exc}",
tags=["hitl", "payload", "error"],
)
return ReviewDecision(action=ACTION_REJECT, instructions=f"form build failed: {exc}")
return ReviewDecision(action=ACTION_REJECT, instructions=f"payload build failed: {exc}")

try:
created = await create_hax_form_request_with_timeout(
app=app,
hax_client=hax_client,
form=form,
payload=payload,
request_type=HAX_REVIEW_TEMPLATE,
title=title,
description=None,
expires_in_seconds=expires_in_hours * 3600,
Expand Down
33 changes: 33 additions & 0 deletions src/pr_af/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,23 @@ async def run(self) -> ReviewResult:
print("[PR-AF] Phase 8: OUTPUT (direct post)", flush=True)
return await self._finish(scored_findings, intake, anatomy, plan, post=True)

# Nothing to triage: don't bother a human (and don't auto-post an
# empty "approved" review to a public repo). Complete silently.
if not scored_findings:
self.app.note(
"hitl: no findings — skipping review gate, posting nothing",
tags=["hitl", "no-post", "no-findings"],
)
print("[PR-AF] HITL: no findings — skipping gate, not posting", flush=True)
return await self._finish(scored_findings, intake, anatomy, plan, post=False)

decision = await request_review_approval(
app=self.app,
hax_client=hax_client,
pr_intent=intake.pr_summary,
findings=scored_findings,
pr_label=self._pr_label(),
pr_meta=self._pr_meta(),
webhook_url=approval_webhook_url(self.app),
user_id=self.config.hitl.approval_user_id,
expires_in_hours=self.config.hitl.approval_expires_in_hours,
Expand Down Expand Up @@ -273,6 +284,28 @@ def _pr_label(self) -> str:
return f"{self.pr_data.owner}/{self.pr_data.repo}#{self.pr_data.number}"
return ""

def _pr_meta(self) -> dict[str, Any]:
"""PR metadata block for the hax review template (camelCase keys)."""
pr = self.pr_data
if not pr:
return {}
url = self.input.pr_url or ""
if not url and pr.owner and pr.repo and pr.number:
url = f"https://github.com/{pr.owner}/{pr.repo}/pull/{pr.number}"
repo = f"{pr.owner}/{pr.repo}" if pr.owner and pr.repo else (pr.repo or "")
meta: dict[str, Any] = {
"title": pr.title or "",
"number": pr.number or None,
"url": url,
"repo": repo,
"author": pr.author or "",
}
if pr.changed_files:
meta["filesChangedCount"] = len(pr.changed_files)
meta["additionsCount"] = sum(f.additions for f in pr.changed_files)
meta["deletionsCount"] = sum(f.deletions for f in pr.changed_files)
return meta

def _merge_feedback(self, revision_history: list[str]) -> str:
"""Collapse accumulated reviewer instructions into one guidance string."""
items = [instr.strip() for instr in revision_history if instr and instr.strip()]
Expand Down
Loading
Loading