Skip to content

Commit d749cb2

Browse files
feat: add black8 linter
1 parent 637a1a8 commit d749cb2

File tree

17 files changed

+639
-0
lines changed

17 files changed

+639
-0
lines changed

flake8-durable-execution/README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# flake8-durable-execution
2+
3+
Flake8 plugin for AWS Durable Execution SDK best practices.
4+
5+
## Installation
6+
7+
```bash
8+
pip install flake8-durable-execution
9+
```
10+
11+
## Usage
12+
13+
```bash
14+
flake8 your_code.py
15+
```
16+
17+
The plugin is automatically detected by Flake8.
18+
19+
## Rules
20+
21+
| Code | Description |
22+
|------|-------------|
23+
| DAR001 | Avoid random calls in durable functions - results differ on replay |
24+
| DAR002 | Avoid datetime.now()/today() in durable functions - time differs on replay |
25+
| DAR003 | Avoid uuid1()/uuid4() in durable functions - generates different values on replay |
26+
| DAR004 | Avoid time.time()/monotonic() in durable functions - time differs on replay |
27+
| DAR005 | Avoid os.environ access in durable functions - environment may differ on replay |
28+
| DAR006 | Wrap network calls in ctx.step() - responses may differ on replay |
29+
| DAR007 | Durable operations cannot be called inside @durable_step |
30+
| DAR008 | Avoid mutating closure variables in steps - mutations are lost on replay |
31+
32+
## Why?
33+
34+
Durable functions may be replayed multiple times. Non-deterministic operations like `random.random()`, `datetime.now()`, or `uuid.uuid4()` will produce different values on each replay, causing unexpected behavior.
35+
36+
Instead, use durable steps to checkpoint these values so they remain consistent across replays.
37+
38+
## Examples
39+
40+
### Bad
41+
42+
```python
43+
@durable_execution
44+
def handler(event, ctx):
45+
# DAR001: random value changes on replay
46+
value = random.randint(1, 100)
47+
48+
# DAR002: time changes on replay
49+
now = datetime.now()
50+
51+
# DAR006: network response may differ
52+
response = requests.get("https://api.example.com")
53+
```
54+
55+
### Good
56+
57+
```python
58+
@durable_execution
59+
def handler(event, ctx):
60+
# Checkpoint non-deterministic values
61+
value = ctx.step(lambda: random.randint(1, 100))
62+
now = ctx.step(lambda: datetime.now().isoformat())
63+
response = ctx.step(lambda: requests.get("https://api.example.com").json())
64+
```
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""Example file with bad patterns inside @durable_execution for testing."""
2+
3+
import os
4+
import random
5+
import time
6+
import uuid
7+
from datetime import datetime
8+
9+
import requests
10+
11+
from aws_durable_execution_sdk_python import (
12+
DurableContext,
13+
StepContext,
14+
durable_execution,
15+
durable_step,
16+
)
17+
18+
19+
# This function is NOT decorated - should NOT trigger any warnings
20+
def normal_function():
21+
value = random.random() # OK - not in durable context
22+
now = datetime.now() # OK
23+
return value
24+
25+
26+
@durable_execution
27+
def handler(event: dict, context: DurableContext) -> dict:
28+
# GOOD - inside ctx.step(), result is checkpointed
29+
safe_random = context.step(lambda: random.random())
30+
safe_time = context.step(lambda: datetime.now().isoformat())
31+
safe_uuid = context.step(lambda: str(uuid.uuid4()))
32+
33+
# DAR001: random calls OUTSIDE step
34+
value = random.random()
35+
num = random.randint(1, 100)
36+
37+
# DAR002: datetime calls OUTSIDE step
38+
now = datetime.now()
39+
today = datetime.today()
40+
41+
# DAR003: uuid generation OUTSIDE step
42+
id1 = uuid.uuid4()
43+
id2 = uuid.uuid1()
44+
45+
# DAR004: time calls OUTSIDE step
46+
ts = time.time()
47+
mono = time.monotonic()
48+
49+
# DAR005: os.environ access OUTSIDE step
50+
api_key = os.environ["API_KEY"]
51+
52+
# DAR006: network calls OUTSIDE step
53+
response = requests.get("https://api.example.com")
54+
data = requests.post("https://api.example.com", json={})
55+
56+
# DAR008: closure mutation
57+
count = 0
58+
count += 1
59+
items = {}
60+
items["key"] = "value"
61+
62+
return {"value": value}
63+
64+
65+
# DAR007: durable ops inside @durable_step
66+
@durable_step
67+
def bad_step(step_ctx: StepContext) -> dict:
68+
# This is wrong - can't call durable ops inside a step
69+
result = step_ctx.step(lambda: 1)
70+
step_ctx.wait(duration=5)
71+
return {"result": result}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "flake8-durable-execution"
7+
version = "0.1.0"
8+
description = "Flake8 plugin for AWS Durable Execution SDK best practices"
9+
readme = "README.md"
10+
requires-python = ">=3.11"
11+
license = "Apache-2.0"
12+
keywords = ["flake8", "linter", "durable-execution", "aws", "lambda"]
13+
authors = [{ name = "AWS" }]
14+
classifiers = [
15+
"Development Status :: 3 - Alpha",
16+
"Framework :: Flake8",
17+
"Intended Audience :: Developers",
18+
"Programming Language :: Python :: 3.11",
19+
"Programming Language :: Python :: 3.12",
20+
"Programming Language :: Python :: 3.13",
21+
]
22+
dependencies = ["flake8>=6.0.0"]
23+
24+
[project.entry-points."flake8.extension"]
25+
DAR = "flake8_durable_execution:DurableExecutionChecker"
26+
27+
[project.urls]
28+
Source = "https://github.com/aws/aws-durable-execution-sdk-python"
29+
30+
[tool.hatch.build.targets.wheel]
31+
packages = ["src/flake8_durable_execution"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from flake8_durable_execution.checker import DurableExecutionChecker
2+
3+
__version__ = "0.1.0"
4+
__all__ = ["DurableExecutionChecker"]
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""Flake8 checker for AWS Durable Execution SDK best practices."""
2+
3+
import ast
4+
from typing import Iterator
5+
6+
from flake8_durable_execution.rules import ALL_RULES
7+
from flake8_durable_execution.rules.nested_durable_op import NestedDurableOpRule
8+
9+
DURABLE_EXECUTION_DECORATORS = {"durable_execution"}
10+
DURABLE_STEP_DECORATORS = {"durable_step"}
11+
DURABLE_OPS = {"step", "invoke", "parallel", "map"}
12+
13+
14+
class DurableExecutionChecker:
15+
name = "flake8-durable-execution"
16+
version = "0.1.0"
17+
18+
def __init__(self, tree: ast.AST) -> None:
19+
self.tree = tree
20+
self.rules = [rule() for rule in ALL_RULES if rule != NestedDurableOpRule]
21+
self.nested_rule = NestedDurableOpRule()
22+
23+
def run(self) -> Iterator[tuple[int, int, str, type]]:
24+
for node in ast.walk(self.tree):
25+
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
26+
# Check @durable_execution functions for non-determinism
27+
if self._has_decorator(node, DURABLE_EXECUTION_DECORATORS):
28+
yield from self._check_durable_function(node)
29+
30+
# Check @durable_step functions for nested durable ops
31+
if self._has_decorator(node, DURABLE_STEP_DECORATORS):
32+
yield from self.nested_rule.check_function(node)
33+
34+
def _has_decorator(self, node: ast.FunctionDef | ast.AsyncFunctionDef, decorators: set[str]) -> bool:
35+
for decorator in node.decorator_list:
36+
if isinstance(decorator, ast.Name) and decorator.id in decorators:
37+
return True
38+
if isinstance(decorator, ast.Attribute) and decorator.attr in decorators:
39+
return True
40+
return False
41+
42+
def _check_durable_function(self, func: ast.FunctionDef | ast.AsyncFunctionDef) -> Iterator[tuple[int, int, str, type]]:
43+
# Collect all nodes that are inside ctx.step() calls - these are safe
44+
safe_nodes: set[int] = set()
45+
self._collect_safe_nodes(func, safe_nodes)
46+
47+
for node in ast.walk(func):
48+
if id(node) in safe_nodes:
49+
continue
50+
for rule in self.rules:
51+
yield from rule.check(node)
52+
53+
def _collect_safe_nodes(self, node: ast.AST, safe_nodes: set[int]) -> None:
54+
"""Collect all nodes inside ctx.step(lambda: ...) calls."""
55+
for child in ast.walk(node):
56+
if not isinstance(child, ast.Call):
57+
continue
58+
59+
# Check if this is a ctx.step(...) call
60+
if isinstance(child.func, ast.Attribute) and child.func.attr in DURABLE_OPS:
61+
# Mark all nodes inside the step arguments as safe
62+
for arg in child.args:
63+
for inner in ast.walk(arg):
64+
safe_nodes.add(id(inner))
65+
for kw in child.keywords:
66+
for inner in ast.walk(kw.value):
67+
safe_nodes.add(id(inner))
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from flake8_durable_execution.rules.random_call import RandomCallRule
2+
from flake8_durable_execution.rules.datetime_now import DatetimeNowRule
3+
from flake8_durable_execution.rules.uuid_generation import UuidGenerationRule
4+
from flake8_durable_execution.rules.time_time import TimeTimeRule
5+
from flake8_durable_execution.rules.os_environ import OsEnvironRule
6+
from flake8_durable_execution.rules.network_call import NetworkCallRule
7+
from flake8_durable_execution.rules.nested_durable_op import NestedDurableOpRule
8+
from flake8_durable_execution.rules.closure_mutation import ClosureMutationRule
9+
10+
ALL_RULES = [
11+
RandomCallRule,
12+
DatetimeNowRule,
13+
UuidGenerationRule,
14+
TimeTimeRule,
15+
OsEnvironRule,
16+
NetworkCallRule,
17+
NestedDurableOpRule,
18+
ClosureMutationRule,
19+
]
20+
21+
__all__ = [
22+
"RandomCallRule",
23+
"DatetimeNowRule",
24+
"UuidGenerationRule",
25+
"TimeTimeRule",
26+
"OsEnvironRule",
27+
"NetworkCallRule",
28+
"NestedDurableOpRule",
29+
"ClosureMutationRule",
30+
"ALL_RULES",
31+
]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Base class for all rules."""
2+
3+
import ast
4+
from abc import ABC, abstractmethod
5+
from typing import Iterator
6+
7+
Error = tuple[int, int, str, type]
8+
9+
10+
class BaseRule(ABC):
11+
"""Base class for lint rules."""
12+
13+
code: str
14+
message: str
15+
16+
@abstractmethod
17+
def check(self, node: ast.AST) -> Iterator[Error]:
18+
"""Check a node and yield errors."""
19+
pass
20+
21+
def _error(self, node: ast.AST) -> Error:
22+
"""Create an error tuple for the given node."""
23+
return (node.lineno, node.col_offset, f"{self.code} {self.message}", type(self))
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""DAR008: Detect closure mutations that get lost on replay."""
2+
3+
import ast
4+
from typing import Iterator
5+
6+
from flake8_durable_execution.rules.base import BaseRule, Error
7+
8+
9+
class ClosureMutationRule(BaseRule):
10+
code = "DAR008"
11+
message = "Avoid mutating closure variables in steps - mutations are lost on replay"
12+
13+
def __init__(self) -> None:
14+
self._outer_vars: set[str] = set()
15+
self._in_step_function = False
16+
17+
def check(self, node: ast.AST) -> Iterator[Error]:
18+
# Look for augmented assignments (+=, -=, etc.) to outer scope vars
19+
if isinstance(node, ast.AugAssign):
20+
if isinstance(node.target, ast.Name):
21+
# This is a heuristic - we flag any augmented assignment
22+
# that looks like it could be mutating an outer variable
23+
yield self._error(node)
24+
25+
# Look for attribute mutations on outer variables
26+
if isinstance(node, ast.Assign):
27+
for target in node.targets:
28+
if isinstance(target, ast.Subscript):
29+
# list[0] = x or dict["key"] = x
30+
yield self._error(node)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""DEX002: Detect non-deterministic datetime calls."""
2+
3+
import ast
4+
from typing import Iterator
5+
6+
from flake8_durable_execution.rules.base import BaseRule, Error
7+
8+
DATETIME_METHODS = {"now", "utcnow", "today"}
9+
10+
11+
class DatetimeNowRule(BaseRule):
12+
code = "DAR002"
13+
message = "Avoid datetime.now()/today() in durable functions - time differs on replay"
14+
15+
def check(self, node: ast.AST) -> Iterator[Error]:
16+
if not isinstance(node, ast.Call):
17+
return
18+
19+
if isinstance(node.func, ast.Attribute):
20+
if node.func.attr in DATETIME_METHODS:
21+
yield self._error(node)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""DAR007: Detect durable operations inside @durable_step functions."""
2+
3+
import ast
4+
from typing import Iterator
5+
6+
from flake8_durable_execution.rules.base import BaseRule, Error
7+
8+
DURABLE_OPS = {"step", "wait", "invoke", "parallel", "map", "wait_for_callback", "wait_for_condition"}
9+
10+
11+
class NestedDurableOpRule(BaseRule):
12+
code = "DAR007"
13+
message = "Durable operations cannot be called inside @durable_step"
14+
15+
def __init__(self) -> None:
16+
self.in_durable_step = False
17+
18+
def check_function(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> Iterator[Error]:
19+
"""Check a @durable_step function for nested durable ops."""
20+
for child in ast.walk(node):
21+
if isinstance(child, ast.Call) and isinstance(child.func, ast.Attribute):
22+
if child.func.attr in DURABLE_OPS:
23+
yield (child.lineno, child.col_offset, f"{self.code} {self.message}", type(self))
24+
25+
def check(self, node: ast.AST) -> Iterator[Error]:
26+
# This rule is handled specially by the checker
27+
return
28+
yield # Make this a generator

0 commit comments

Comments
 (0)