Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions awsclilinter/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
include README.md
recursive-include awsclilinter *.py
recursive-exclude tests *
include LICENSE.txt
recursive-exclude examples *
3 changes: 2 additions & 1 deletion awsclilinter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ In interactive mode, you can:
- Press `y` to accept the current change
- Press `n` to skip the current change
- Press `u` to accept all remaining changes
- Press `s` to save and exit (applies all accepted changes so far)
- Press `s` to save and exit
- Press `q` to quit without saving

## Development

Expand Down
200 changes: 162 additions & 38 deletions awsclilinter/awsclilinter/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
import difflib
import sys
from pathlib import Path
from typing import List
from typing import Dict, List, Optional, Tuple

from awsclilinter.linter import ScriptLinter
from awsclilinter.rules import LintFinding
from ast_grep_py.ast_grep_py import SgRoot

from awsclilinter import linter
from awsclilinter.linter import parse
from awsclilinter.rules import LintFinding, LintRule
from awsclilinter.rules.base64_rule import Base64BinaryFormatRule
from awsclilinter.rules.pagination_rule import PaginationRule

# ANSI color codes
RED = "\033[31m"
Expand All @@ -18,13 +22,17 @@
CONTEXT_SIZE = 3


def get_user_choice(prompt: str) -> str:
def prompt_user_choice_interactive_mode() -> str:
"""Get user input for interactive mode."""
while True:
choice = input(prompt).lower().strip()
if choice in ["y", "n", "u", "s"]:
choice = (
input("\nApply this fix? [y]es, [n]o, [u]pdate all, [s]ave and exit, [q]uit: ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For accessibility, we should make the letter to select separate from the word that describes. A screen reader would not be able to read this well. How about:

Suggested change
input("\nApply this fix? [y]es, [n]o, [u]pdate all, [s]ave and exit, [q]uit: ")
input("\nApply this fix? [y] yes, [n] no, [u] update all, [s] save and exit, [q] quit: ")

.lower()
.strip()
)
if choice in ["y", "n", "u", "s", "q"]:
return choice
print("Invalid choice. Please enter y, n, u, or s.")
print("Invalid choice. Please enter y, n, u, s, or q.")


def display_finding(finding: LintFinding, index: int, total: int, script_content: str):
Expand Down Expand Up @@ -75,22 +83,89 @@ def display_finding(finding: LintFinding, index: int, total: int, script_content
print(line, end="")


def interactive_mode(findings: List[LintFinding], script_content: str) -> List[LintFinding]:
"""Run interactive mode and return accepted findings."""
accepted = []
for i, finding in enumerate(findings, 1):
display_finding(finding, i, len(findings), script_content)
choice = get_user_choice("\nApply this fix? [y]es, [n]o, [u]pdate all, [s]ave and exit: ")
def apply_all_fixes(
findings_with_rules: List[Tuple[LintFinding, LintRule]],
ast: SgRoot,
) -> str:
"""Apply all fixes using rule-by-rule processing.

Since multiple rules can target the same command, we must process one rule
at a time and re-parse the updated script between rules to get fresh Edit objects.

Args:
findings_with_rules: List of findings and their rules.
ast: Current script represented as an AST.

Returns:
Modified script content
"""
current_ast = ast

# Group findings by rule
findings_by_rule: Dict[str, List[LintFinding]] = {}
for finding, rule in findings_with_rules:
if rule.name not in findings_by_rule:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use collections.defaultdict to reduce checking here, not a strong preference.

findings_by_rule[rule.name] = []
findings_by_rule[rule.name].append(finding)

# Process one rule at a time, re-parsing between rules
for rule in findings_by_rule:
updated_script = linter.apply_fixes(current_ast, findings_by_rule[rule])
current_ast = parse(updated_script)
return current_ast.root().text()


if choice == "y":
accepted.append(finding)
elif choice == "u":
accepted.extend(findings[i - 1 :])
break
elif choice == "s":
break
def interactive_mode_for_rule(
findings: List[LintFinding],
ast: SgRoot,
finding_offset: int,
total_findings: int,
) -> Tuple[SgRoot, bool, Optional[str]]:
"""Run interactive mode for a single rule's findings.

return accepted
Args:
findings: List of findings for this rule.
ast: Current script content, represented as an AST.
finding_offset: Offset for display numbering.
total_findings: Total number of findings across all rules.

Returns:
Tuple of (ast, changes_made, last_choice)
ast is the resulting AST from this interactive mode execution.
changes_made whether the AST was updated based on user choice.
last_choice is the last choice entered by the user.
"""
accepted_findings: List[LintFinding] = []
last_choice: Optional[str] = None

for i, finding in enumerate(findings):
display_finding(finding, finding_offset + i + 1, total_findings, ast.root().text())
last_choice = prompt_user_choice_interactive_mode()

if last_choice == "y":
accepted_findings.append(finding)
elif last_choice == "n":
pass # Skip this finding
elif last_choice == "u":
# Accept this and all remaining findings for this rule.
accepted_findings.extend(findings[i:])
if accepted_findings:
ast = parse(linter.apply_fixes(ast, accepted_findings))
return ast, True, last_choice
elif last_choice == "s":
# Apply accepted findings and stop processing
if accepted_findings:
ast = parse(linter.apply_fixes(ast, accepted_findings))
return ast, len(accepted_findings) > 0, last_choice
elif last_choice == "q":
print("Quit without saving.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
print("Quit without saving.")
print("Quitting without saving.")

sys.exit(0)

if accepted_findings:
ast = parse(linter.apply_fixes(ast, accepted_findings))
return ast, True, last_choice

return ast, False, last_choice


def main():
Expand Down Expand Up @@ -127,31 +202,80 @@ def main():

script_content = script_path.read_text()

rules = [Base64BinaryFormatRule()]
linter = ScriptLinter(rules)
findings = linter.lint(script_content)

if not findings:
print("No issues found.")
return
rules = [Base64BinaryFormatRule(), PaginationRule()]

if args.interactive:
findings = interactive_mode(findings, script_content)
if not findings:
# Do an initial parse-and-lint with all the rules simultaneously to compute the total
# number of findings for displaying progress in interactive mode.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the value of displaying progress here? It seems a bit unnecessary to completely run through the entire set of possible changes only to do it again without just saving the diffs of what would have been changed and then accepting/denying them in the later loop. That would increase complexity of the solution, and may not matter for performance (scripts might not be that big).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also be fine with a summary at the end that said how many changes could have been applied and how many were.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I was second-guessing this myself. I figured users may want to know how many findings are left especially if the script is large. However, that may not be super valuable, can always add it back if users request it.

Will remove this in next revision.

current_ast = parse(script_content)
findings_with_rules = linter.lint(current_ast, rules)

if not findings_with_rules:
print("No issues found.")
return

# Process one rule at a time, re-parsing between rules
current_script = script_content
any_changes = False
finding_offset = 0

# Calculate total findings for display
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, but this should probably go above the comment "# Process one rule at a time" for clarity of where the logic starts.

total_findings = len(findings_with_rules)

for rule_index, rule in enumerate(rules):
# Lint for this specific rule with current script state
rule_findings = linter.lint_for_rule(current_ast, rule)

if not rule_findings:
continue

current_ast, changes_made, last_choice = interactive_mode_for_rule(
rule_findings, current_ast, finding_offset, total_findings
)

if changes_made:
current_script = current_ast.root().text()
any_changes = True

finding_offset += len(rule_findings)

if last_choice == "s":
break

# If user chose 'u', auto-apply all remaining rules
if last_choice == "u":
for remaining_rule in rules[rule_index + 1 :]:
remaining_findings = linter.lint_for_rule(current_ast, remaining_rule)
if remaining_findings:
current_script = linter.apply_fixes(current_ast, remaining_findings)
any_changes = True
break

if not any_changes:
print("No changes accepted.")
return

if args.fix or args.output or args.interactive:
# Interactive mode is functionally equivalent to --fix, except the user
# can select a subset of the changes to apply.
fixed_content = linter.apply_fixes(script_content, findings)
output_path = Path(args.output) if args.output else script_path
output_path.write_text(fixed_content)
output_path.write_text(current_script)
print(f"Fixed script written to: {output_path}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
print(f"Fixed script written to: {output_path}")
print(f"Modified script written to: {output_path}")

elif args.fix or args.output:
current_ast = parse(script_content)
findings_with_rules = linter.lint(current_ast, rules)
updated_script = apply_all_fixes(findings_with_rules, current_ast)
output_path = Path(args.output) if args.output else script_path
output_path.write_text(updated_script)
print(f"Fixed script written to: {output_path}")
else:
print(f"\nFound {len(findings)} issue(s):\n")
for i, finding in enumerate(findings, 1):
display_finding(finding, i, len(findings), script_content)
current_ast = parse(script_content)
findings_with_rules = linter.lint(current_ast, rules)

if not findings_with_rules:
print("No issues found.")
return

print(f"\nFound {len(findings_with_rules)} issue(s):\n")
for i, (finding, _) in enumerate(findings_with_rules, 1):
display_finding(finding, i, len(findings_with_rules), script_content)
print("\n\nRun with --fix to apply changes or --interactive to review each change.")


Expand Down
63 changes: 45 additions & 18 deletions awsclilinter/awsclilinter/linter.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,53 @@
from typing import List
from typing import List, Tuple

from ast_grep_py import SgRoot
from ast_grep_py.ast_grep_py import SgRoot

from awsclilinter.rules import LintFinding, LintRule


class ScriptLinter:
"""Linter for bash scripts to detect AWS CLI v1 to v2 migration issues."""
def parse(script_content: str) -> SgRoot:
"""Parse the bash script content into an AST."""
return SgRoot(script_content, "bash")

def __init__(self, rules: List[LintRule]):
self.rules = rules

def lint(self, script_content: str) -> List[LintFinding]:
"""Lint the script and return all findings."""
root = SgRoot(script_content, "bash")
findings = []
for rule in self.rules:
findings.extend(rule.check(root))
return sorted(findings, key=lambda f: (f.line_start, f.line_end))
def lint(ast: SgRoot, rules: List[LintRule]) -> List[Tuple[LintFinding, LintRule]]:
"""Lint the AST and return all findings with their associated rules."""
findings_with_rules = []
for rule in rules:
findings = rule.check(ast)
for finding in findings:
findings_with_rules.append((finding, rule))
return sorted(findings_with_rules, key=lambda fr: (fr[0].edit.start_pos, fr[0].edit.end_pos))

def apply_fixes(self, script_content: str, findings: List[LintFinding]) -> str:
"""Apply fixes to the script content."""
root = SgRoot(script_content, "bash")
node = root.root()
return node.commit_edits([f.edit for f in findings])

def lint_for_rule(ast: SgRoot, rule: LintRule) -> List[LintFinding]:
"""Lint the script for a single rule.

Args:
ast: The AST to lint for the rule.
rule: The rule to check.

Returns:
List of findings for this rule, sorted by position (ascending)
"""
findings = rule.check(ast)
return sorted(findings, key=lambda f: (f.edit.start_pos, f.edit.end_pos))


def apply_fixes(ast: SgRoot, findings: List[LintFinding]) -> str:
"""Apply to the AST for a single rule.

Args:
ast: The AST representation of the script to apply fixes to.
findings: List of findings from a single rule to apply.

Returns:
Modified script content
"""
root = ast.root()
if not findings:
return root.text()

# Collect all edits - they should be non-overlapping within a single rule
edits = [f.edit for f in findings]
return root.commit_edits(edits)
6 changes: 5 additions & 1 deletion awsclilinter/awsclilinter/rules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,9 @@ def description(self) -> str:

@abstractmethod
def check(self, root: SgRoot) -> List[LintFinding]:
"""Check the AST root for violations and return findings."""
"""Check the AST root for violations and return findings.

Args:
root: The AST root to check
"""
pass
8 changes: 3 additions & 5 deletions awsclilinter/awsclilinter/rules/base64_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@


class Base64BinaryFormatRule(LintRule):
"""Detects AWS CLI commands with file:// that need --cli-binary-format. This is a best-effort
attempt at statically detecting the breaking change with how AWS CLI v2 treats binary
parameters."""
"""Detects any AWS CLI command that does not specify the --cli-binary-format. This mitigates
the breaking change with how AWS CLI v2 treats binary parameters."""

@property
def name(self) -> str:
Expand All @@ -23,13 +22,12 @@ def description(self) -> str:
)

def check(self, root: SgRoot) -> List[LintFinding]:
"""Check for AWS CLI commands with file:// missing --cli-binary-format."""
"""Check for AWS CLI commands missing --cli-binary-format."""
node = root.root()
base64_broken_nodes = node.find_all(
all=[
{"kind": "command"},
{"pattern": "aws $SERVICE $OPERATION $$$ARGS"},
{"has": {"kind": "word", "regex": r"\Afile://"}},
{"not": {"has": {"kind": "word", "pattern": "--cli-binary-format"}}},
]
)
Expand Down
Loading