diff --git a/docs/guide.fa.md b/docs/guide.fa.md index d0247453..362c4b61 100644 --- a/docs/guide.fa.md +++ b/docs/guide.fa.md @@ -225,7 +225,7 @@ HTTP / HTTPS مثل قبل از Apps Script می‌رود (تغییری نمی | ۶ | ۱۸۰ | توصیه‌شده برای استفادهٔ سنگین | | ۱۲ | ۳۶۰ | چند حساب — حداکثر توان | -بیشتر Deployment = همزمانی بیشتر = تأخیر کمتر هر سشن. هر بَچ بین IDها چرخش می‌کند و بار به‌طور یکنواخت توزیع می‌شود، احتمال رسیدن به سقف سهمیهٔ یک Deployment کاهش می‌یابد. +بیشتر Deployment = همزمانی بیشتر = تأخیر کمتر هر سشن. انتخاب هر بَچ از بین IDهای تنظیم‌شده با یک ledger محلی rolling 24-hour انجام می‌شود؛ بار پخش می‌شود و کلاینت از Deploymentهایی که همین دستگاه نزدیک سقف request سهمیهٔ رایگان برده دوری می‌کند. **محافظ‌های منابع:** - **حداکثر ۵۰ op** در هر بَچ — اگر سشن‌های فعال بیشتر باشند، مالتی‌پلکسر چند بَچ می‌فرستد diff --git a/docs/guide.md b/docs/guide.md index 679a35d0..dbecaf9e 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -225,7 +225,7 @@ max_concurrent = 30 × number_of_deployment_ids | 6 | 180 | Recommended for heavy use | | 12 | 360 | Multi-account power setup | -More deployments = more total concurrency = lower per-session latency. Each batch round-robins across your IDs, spreading load and reducing the chance of hitting any single deployment's quota ceiling. +More deployments = more total concurrency = lower per-session latency. Each batch is selected from the configured IDs with a local rolling 24-hour ledger, spreading load and steering away from deployments this client has already driven near the free-tier request budget. **Resource guards:** - **50 ops max** per batch — if more sessions are active, the mux splits into multiple batches diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index 0e11e764..75563afc 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -13,7 +13,7 @@ //! buffered `relay_parallel_range` compatibility wrapper for callers that //! want a `Vec` back. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; // AtomicU64 via portable-atomic: native on 64-bit / armv7, spinlock- // backed on mipsel (MIPS32 has no 64-bit atomic instructions). API // is identical to std::sync::atomic::AtomicU64 so call sites need @@ -147,6 +147,12 @@ const H1_OPEN_TIMEOUT_SECS: u64 = 8; /// request to wake back up — most painful on YouTube / streaming where /// the first chunk after a quiet pause stalls the player. const H1_KEEPALIVE_INTERVAL_SECS: u64 = 240; +/// Conservative local estimate of the Apps Script UrlFetchApp free-tier +/// request budget per deployment account. This is not an authoritative Google +/// quota read; it is a client-side selector guard that avoids concentrating +/// traffic on a deployment this process has already used heavily. +const SCRIPT_QUOTA_FREE_TIER_CALLS: usize = 20_000; +const SCRIPT_QUOTA_WINDOW: Duration = Duration::from_secs(24 * 60 * 60); /// Largest response body Apps Script's `UrlFetchApp` will deliver before /// the script gets killed mid-execution. The hard wire ceiling is ~50 MiB; /// after base64 / envelope overhead and edge variance, the practical raw @@ -357,6 +363,11 @@ pub struct DomainFronter { inflight: Arc>>>>, coalesced: AtomicU64, blacklist: Arc>>, + /// Per-deployment local call ledger used by `next_script_id` / + /// `next_script_ids` to avoid selecting an already saturated deployment + /// while another configured script still has locally-observed capacity. + /// Entries are pruned on selection against a rolling 24-hour window. + script_quota_ledger: Arc>>>, /// Per-deployment rolling timeout counter. Maps `script_id` → /// `(window_start, strike_count)`. Reset when the window expires /// or when a batch succeeds. Triggers a short-cooldown blacklist @@ -620,6 +631,7 @@ impl DomainFronter { inflight: Arc::new(Mutex::new(HashMap::new())), coalesced: AtomicU64::new(0), blacklist: Arc::new(std::sync::Mutex::new(HashMap::new())), + script_quota_ledger: Arc::new(std::sync::Mutex::new(HashMap::new())), script_timeouts: Arc::new(std::sync::Mutex::new(HashMap::new())), relay_calls: AtomicU64::new(0), relay_failures: AtomicU64::new(0), @@ -802,21 +814,39 @@ impl DomainFronter { let mut bl = self.blacklist.lock().unwrap(); let now = Instant::now(); bl.retain(|_, until| *until > now); + let mut quota = self.script_quota_ledger.lock().unwrap(); + prune_script_quota_ledger(&mut quota, now); + let mut saturated_fallback: Option = None; for _ in 0..n { let idx = self.script_idx.fetch_add(1, Ordering::Relaxed); let sid = &self.script_ids[idx % n]; if !bl.contains_key(sid) { - return sid.clone(); + if script_has_local_quota_capacity("a, sid) { + record_script_quota_call_locked(&mut quota, sid, now); + return sid.clone(); + } + saturated_fallback.get_or_insert_with(|| sid.clone()); } } + // If every non-blacklisted deployment is locally saturated, preserve + // connectivity instead of hard-failing. Paid Workspace quotas and + // traffic from other clients are not visible to this process, so this + // ledger is a steering signal, not an authoritative quota gate. + if let Some(sid) = saturated_fallback { + record_script_quota_call_locked(&mut quota, &sid, now); + return sid; + } // All blacklisted: pick whichever comes off cooldown soonest. if let Some((sid, _)) = bl.iter().min_by_key(|(_, t)| **t) { let sid = sid.clone(); bl.remove(&sid); + record_script_quota_call_locked(&mut quota, &sid, now); return sid; } - self.script_ids[0].clone() + let sid = self.script_ids[0].clone(); + record_script_quota_call_locked(&mut quota, &sid, now); + sid } /// Pick `want` distinct non-blacklisted script IDs for a parallel fan-out @@ -831,8 +861,11 @@ impl DomainFronter { let mut bl = self.blacklist.lock().unwrap(); let now = Instant::now(); bl.retain(|_, until| *until > now); + let mut quota = self.script_quota_ledger.lock().unwrap(); + prune_script_quota_ledger(&mut quota, now); let mut picked: Vec = Vec::with_capacity(want); + let mut saturated_fallback: Vec = Vec::with_capacity(want); for _ in 0..n { if picked.len() >= want { break; @@ -840,11 +873,22 @@ impl DomainFronter { let idx = self.script_idx.fetch_add(1, Ordering::Relaxed); let sid = &self.script_ids[idx % n]; if !bl.contains_key(sid) && !picked.iter().any(|p| p == sid) { - picked.push(sid.clone()); + if script_has_local_quota_capacity("a, sid) { + picked.push(sid.clone()); + } else if !saturated_fallback.iter().any(|p| p == sid) { + saturated_fallback.push(sid.clone()); + } } } if picked.is_empty() { - picked.push(self.script_ids[0].clone()); + if let Some(sid) = saturated_fallback.into_iter().next() { + picked.push(sid); + } else { + picked.push(self.script_ids[0].clone()); + } + } + for sid in &picked { + record_script_quota_call_locked(&mut quota, sid, now); } picked } @@ -3879,6 +3923,40 @@ fn add_random_pad(map: &mut serde_json::Map) { map.insert("_pad".into(), Value::String(B64.encode(&buf))); } +fn prune_script_quota_ledger( + ledger: &mut HashMap>, + now: Instant, +) { + let cutoff = now.checked_sub(SCRIPT_QUOTA_WINDOW).unwrap_or(now); + ledger.retain(|_, calls| { + while calls.front().map(|ts| *ts <= cutoff).unwrap_or(false) { + calls.pop_front(); + } + !calls.is_empty() + }); +} + +fn script_has_local_quota_capacity( + ledger: &HashMap>, + script_id: &str, +) -> bool { + ledger + .get(script_id) + .map(|calls| calls.len() < SCRIPT_QUOTA_FREE_TIER_CALLS) + .unwrap_or(true) +} + +fn record_script_quota_call_locked( + ledger: &mut HashMap>, + script_id: &str, + now: Instant, +) { + ledger + .entry(script_id.to_string()) + .or_default() + .push_back(now); +} + /// "YYYY-MM-DD" of the current Pacific Time date. Used as the daily-reset /// boundary for `today_calls` / `today_bytes` because **Apps Script's /// quota counter resets at midnight Pacific Time, not UTC** — that's @@ -6606,6 +6684,110 @@ hello"; DomainFronter::new(&cfg).expect("test fronter must construct") } + fn fronter_for_script_ids(script_ids: &[&str]) -> DomainFronter { + let script_ids_json = serde_json::to_string(script_ids).unwrap(); + let json = format!( + r#"{{ + "mode": "apps_script", + "google_ip": "127.0.0.1", + "front_domain": "www.google.com", + "script_id": {}, + "auth_key": "test_auth_key", + "listen_host": "127.0.0.1", + "listen_port": 8085, + "log_level": "info", + "verify_ssl": true + }}"#, + script_ids_json + ); + let cfg: Config = serde_json::from_str(&json).unwrap(); + DomainFronter::new(&cfg).expect("test fronter must construct") + } + + fn seed_script_quota(fronter: &DomainFronter, script_id: &str, count: usize, at: Instant) { + let mut ledger = fronter.script_quota_ledger.lock().unwrap(); + ledger + .entry(script_id.to_string()) + .or_default() + .extend(std::iter::repeat(at).take(count)); + } + + #[test] + fn next_script_id_skips_locally_saturated_deployment() { + let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B"]); + seed_script_quota( + &fronter, + "SCRIPT_A", + SCRIPT_QUOTA_FREE_TIER_CALLS, + Instant::now(), + ); + + let selected = fronter.next_script_id(); + + assert_eq!(selected, "SCRIPT_B"); + let ledger = fronter.script_quota_ledger.lock().unwrap(); + assert_eq!( + ledger.get("SCRIPT_B").map(|calls| calls.len()), + Some(1), + "selection must be recorded in the local rolling ledger" + ); + } + + #[test] + fn script_quota_prune_removes_expired_observations() { + let recorded_at = Instant::now(); + let prune_at = recorded_at + .checked_add(SCRIPT_QUOTA_WINDOW + Duration::from_secs(1)) + .expect("test clock must support a 24h monotonic addition"); + let mut ledger = HashMap::new(); + ledger.insert( + "SCRIPT_A".to_string(), + std::iter::repeat(recorded_at) + .take(SCRIPT_QUOTA_FREE_TIER_CALLS) + .collect::>(), + ); + + prune_script_quota_ledger(&mut ledger, prune_at); + + assert!( + ledger.is_empty(), + "rolling quota ledger should discard observations outside the 24h window" + ); + } + + #[test] + fn next_script_id_preserves_connectivity_when_all_scripts_are_locally_saturated() { + let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B"]); + let now = Instant::now(); + seed_script_quota(&fronter, "SCRIPT_A", SCRIPT_QUOTA_FREE_TIER_CALLS, now); + seed_script_quota(&fronter, "SCRIPT_B", SCRIPT_QUOTA_FREE_TIER_CALLS, now); + + let selected = fronter.next_script_id(); + + assert_eq!(selected, "SCRIPT_A"); + let ledger = fronter.script_quota_ledger.lock().unwrap(); + assert_eq!( + ledger.get("SCRIPT_A").map(|calls| calls.len()), + Some(SCRIPT_QUOTA_FREE_TIER_CALLS + 1), + "local saturation is a steering signal, not a hard outage trigger" + ); + } + + #[test] + fn parallel_script_selection_prefers_unsaturated_deployments() { + let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B", "SCRIPT_C"]); + seed_script_quota( + &fronter, + "SCRIPT_A", + SCRIPT_QUOTA_FREE_TIER_CALLS, + Instant::now(), + ); + + let selected = fronter.next_script_ids(2); + + assert_eq!(selected, vec!["SCRIPT_B".to_string(), "SCRIPT_C".to_string()]); + } + #[tokio::test(flavor = "current_thread")] async fn force_http1_disables_h2_at_construction() { // The kill switch: force_http1=true must mark the fronter as