Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ e2e/langfuse/.env.langfuse-keys
logs/
reports/

# Scalability test output
pprof-dumps/
scalability-runs/

# Security scan artifacts
.security-scan/
.security-scan.zip
Expand Down
26 changes: 25 additions & 1 deletion components/operator/internal/handlers/inactivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func init() {

// --- Project-level timeout cache ---

// maxTimeoutCacheEntries caps the namespace timeout cache size to prevent
// unbounded growth in clusters with many namespaces.
const maxTimeoutCacheEntries = 500

// projectTimeoutCache caches inactivityTimeoutSeconds from ProjectSettings per namespace.
type projectTimeoutCache struct {
mu sync.Mutex
Expand Down Expand Up @@ -93,8 +97,28 @@ func getProjectInactivityTimeout(namespace string) int64 {
}
}

// Update cache under lock
// Update cache under lock, evicting stale entries if over capacity
psTimeoutCache.mu.Lock()
if len(psTimeoutCache.entries) >= maxTimeoutCacheEntries {
// Evict expired entries first
now := time.Now()
for k, v := range psTimeoutCache.entries {
if now.Sub(v.fetchedAt) >= inactivityTimeoutCacheTTL {
delete(psTimeoutCache.entries, k)
}
}
// If still over capacity, clear the oldest half
if len(psTimeoutCache.entries) >= maxTimeoutCacheEntries {
count := 0
for k := range psTimeoutCache.entries {
delete(psTimeoutCache.entries, k)
count++
if count >= maxTimeoutCacheEntries/2 {
break
}
}
}
}
psTimeoutCache.entries[namespace] = projectTimeoutEntry{timeout: result, fetchedAt: time.Now()}
psTimeoutCache.mu.Unlock()

Expand Down
28 changes: 19 additions & 9 deletions components/operator/internal/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ import (
"k8s.io/client-go/util/retry"
)

// Shared HTTP client for runner API calls. Reusing a single client enables
// connection pooling and avoids TIME_WAIT socket accumulation.
var runnerHTTPClient = &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
}

// maxErrorBodyBytes limits how much of an error response body we read into
// memory. Prevents unbounded allocation if a runner returns a large error page.
const maxErrorBodyBytes = 64 * 1024 // 64 KB

// handleAgenticSessionEvent is the legacy reconciliation function containing all session
// lifecycle logic (~2,300 lines). It's called by ReconcilePendingSession() wrapper.
//
Expand Down Expand Up @@ -1615,8 +1629,7 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
resp, err := runnerHTTPClient.Do(req)
if err != nil {
log.Printf("[Reconcile] Failed to add repo via runner: %v", err)
continue
Expand Down Expand Up @@ -1647,8 +1660,7 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
resp, err := runnerHTTPClient.Do(req)
if err != nil {
log.Printf("[Reconcile] Failed to change branch via runner: %v", err)
continue
Expand Down Expand Up @@ -1676,8 +1688,7 @@ func reconcileSpecReposWithPatch(sessionNamespace, sessionName string, spec map[
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
resp, err := runnerHTTPClient.Do(req)
if err != nil {
log.Printf("[Reconcile] Failed to remove repo via runner: %v", err)
continue
Expand Down Expand Up @@ -1768,8 +1779,7 @@ func reconcileActiveWorkflowWithPatch(sessionNamespace, sessionName string, spec
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
resp, err := runnerHTTPClient.Do(req)
if err != nil {
log.Printf("[Reconcile] Failed to send workflow change to runner: %v", err)
statusPatch.AddCondition(conditionUpdate{
Expand All @@ -1783,7 +1793,7 @@ func reconcileActiveWorkflowWithPatch(sessionNamespace, sessionName string, spec
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes))
log.Printf("[Reconcile] Runner returned non-200 for workflow change: %d - %s", resp.StatusCode, string(body))
statusPatch.AddCondition(conditionUpdate{
Type: conditionWorkflowReconciled,
Expand Down
75 changes: 75 additions & 0 deletions components/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,22 @@ import (
"context"
"flag"
"log"
"net/http"
_ "net/http/pprof"
"os"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
toolscache "k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -39,6 +48,37 @@ func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
}

// stripTerminalSessions is a cache transform that strips heavy fields from
// AgenticSession objects in terminal phases (Completed, Failed, Stopped).
// Terminal sessions dominate the cache at scale (e.g., 80%+ of 4,319 sessions
// on vteam-uat). Stripping spec and status details reduces their memory
// footprint from ~3.5KB to ~500 bytes while preserving metadata for restart
// detection (the desired-phase annotation lives in metadata.annotations).
var stripTerminalSessions toolscache.TransformFunc = func(obj interface{}) (interface{}, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return obj, nil
}

status, _, _ := unstructured.NestedMap(u.Object, "status")
if status == nil {
return obj, nil
}

phase, _ := status["phase"].(string)
switch phase {
case "Completed", "Failed", "Stopped":
// Keep metadata intact (name, namespace, annotations, labels, ownerRefs,
// resourceVersion, generation) — needed for restart detection and predicates.
// Drop spec and heavy status fields — not needed until session restarts,
// at which point the phase changes to Pending and the full object is re-fetched.
u.Object["spec"] = map[string]interface{}{}
u.Object["status"] = map[string]interface{}{"phase": phase}
}

return obj, nil
}

func main() {
// Handle subcommands before flag parsing
if len(os.Args) > 1 && os.Args[1] == "session-trigger" {
Expand Down Expand Up @@ -120,12 +160,37 @@ func main() {
restConfig.QPS = 100
restConfig.Burst = 200

// AgenticSession unstructured object for cache configuration
sessionCacheObj := &unstructured.Unstructured{}
sessionCacheObj.SetGroupVersionKind(schema.GroupVersionKind{
Group: "vteam.ambient-code",
Version: "v1alpha1",
Kind: "AgenticSession",
})

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: metricsAddr},
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "ambient-code-operator.ambient-code.io",
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
// Only cache runner pods (app=ambient-runner), not every pod in the cluster.
// On vteam-uat, 397 of 477 pods were non-runner system pods.
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{"app": "ambient-runner"}),
},
// Strip heavy fields from terminal sessions in the cache.
// Keeps metadata (for restart detection via desired-phase annotation)
// and phase, but drops spec and status details. On vteam-uat, most of
// 4,319 sessions are terminal — this reduces their per-object memory
// from ~3.5KB to ~500 bytes (metadata + phase only).
sessionCacheObj: {
Transform: stripTerminalSessions,
},
},
},
})
if err != nil {
logger.Error(err, "Unable to create manager")
Expand Down Expand Up @@ -155,6 +220,16 @@ func main() {
os.Exit(1)
}

// Optional pprof server for memory profiling (enable via ENABLE_PPROF=true)
if os.Getenv("ENABLE_PPROF") == "true" {
go func() {
logger.Info("pprof server listening on :6060")
if err := http.ListenAndServe(":6060", nil); err != nil {
logger.Error(err, "pprof server failed")
}
}()
Comment on lines +223 to +230
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bind pprof to loopback instead of all pod interfaces.

With ENABLE_PPROF=true, this listens on :6060 without auth, so any in-cluster caller that can reach the pod can pull heap/profile data. kubectl port-forward still works if you bind to 127.0.0.1:6060.

Minimal fix
-			logger.Info("pprof server listening on :6060")
-			if err := http.ListenAndServe(":6060", nil); err != nil {
+			logger.Info("pprof server listening on 127.0.0.1:6060")
+			if err := http.ListenAndServe("127.0.0.1:6060", nil); err != nil {
 				logger.Error(err, "pprof server failed")
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Optional pprof server for memory profiling (enable via ENABLE_PPROF=true)
if os.Getenv("ENABLE_PPROF") == "true" {
go func() {
logger.Info("pprof server listening on :6060")
if err := http.ListenAndServe(":6060", nil); err != nil {
logger.Error(err, "pprof server failed")
}
}()
// Optional pprof server for memory profiling (enable via ENABLE_PPROF=true)
if os.Getenv("ENABLE_PPROF") == "true" {
go func() {
logger.Info("pprof server listening on 127.0.0.1:6060")
if err := http.ListenAndServe("127.0.0.1:6060", nil); err != nil {
logger.Error(err, "pprof server failed")
}
}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/operator/main.go` around lines 160 - 167, The pprof server is
currently bound to all interfaces in the anonymous goroutine (when ENABLE_PPROF
is true) via http.ListenAndServe(":6060", nil); change the listen address to the
loopback interface so it only accepts local connections (use "127.0.0.1:6060"
instead of ":6060") in that goroutine and keep the same logger.Error handling
for http.ListenAndServe failing.

}

// Start namespace and project settings watchers (these remain as watch loops for now)
// Note: These could be migrated to controller-runtime controllers in the future
go handlers.WatchNamespaces()
Expand Down
Loading