Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide.fa.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ HTTP / HTTPS مثل قبل از Apps Script می‌رود (تغییری نمی
| ۶ | ۱۸۰ | توصیه‌شده برای استفادهٔ سنگین |
| ۱۲ | ۳۶۰ | چند حساب — حداکثر توان |

بیشتر Deployment = همزمانی بیشتر = تأخیر کمتر هر سشن. هر بَچ بین IDها چرخش می‌کند و بار به‌طور یکنواخت توزیع می‌شود، احتمال رسیدن به سقف سهمیهٔ یک Deployment کاهش می‌یابد.
بیشتر Deployment = همزمانی بیشتر = تأخیر کمتر هر سشن. انتخاب هر بَچ از بین IDهای تنظیم‌شده با یک ledger محلی rolling 24-hour انجام می‌شود؛ بار پخش می‌شود و کلاینت از Deploymentهایی که همین دستگاه نزدیک سقف request سهمیهٔ رایگان برده دوری می‌کند.

**محافظ‌های منابع:**
- **حداکثر ۵۰ op** در هر بَچ — اگر سشن‌های فعال بیشتر باشند، مالتی‌پلکسر چند بَچ می‌فرستد
Expand Down
2 changes: 1 addition & 1 deletion docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ max_concurrent = 30 × number_of_deployment_ids
| 6 | 180 | Recommended for heavy use |
| 12 | 360 | Multi-account power setup |

More deployments = more total concurrency = lower per-session latency. Each batch round-robins across your IDs, spreading load and reducing the chance of hitting any single deployment's quota ceiling.
More deployments = more total concurrency = lower per-session latency. Each batch is selected from the configured IDs with a local rolling 24-hour ledger, spreading load and steering away from deployments this client has already driven near the free-tier request budget.

**Resource guards:**
- **50 ops max** per batch — if more sessions are active, the mux splits into multiple batches
Expand Down
192 changes: 187 additions & 5 deletions src/domain_fronter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! buffered `relay_parallel_range` compatibility wrapper for callers that
//! want a `Vec<u8>` back.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
// AtomicU64 via portable-atomic: native on 64-bit / armv7, spinlock-
// backed on mipsel (MIPS32 has no 64-bit atomic instructions). API
// is identical to std::sync::atomic::AtomicU64 so call sites need
Expand Down Expand Up @@ -147,6 +147,12 @@ const H1_OPEN_TIMEOUT_SECS: u64 = 8;
/// request to wake back up — most painful on YouTube / streaming where
/// the first chunk after a quiet pause stalls the player.
const H1_KEEPALIVE_INTERVAL_SECS: u64 = 240;
/// Conservative local estimate of the Apps Script UrlFetchApp free-tier
/// request budget per deployment account. This is not an authoritative Google
/// quota read; it is a client-side selector guard that avoids concentrating
/// traffic on a deployment this process has already used heavily.
const SCRIPT_QUOTA_FREE_TIER_CALLS: usize = 20_000;
const SCRIPT_QUOTA_WINDOW: Duration = Duration::from_secs(24 * 60 * 60);
/// Largest response body Apps Script's `UrlFetchApp` will deliver before
/// the script gets killed mid-execution. The hard wire ceiling is ~50 MiB;
/// after base64 / envelope overhead and edge variance, the practical raw
Expand Down Expand Up @@ -357,6 +363,11 @@ pub struct DomainFronter {
inflight: Arc<Mutex<HashMap<String, broadcast::Sender<Vec<u8>>>>>,
coalesced: AtomicU64,
blacklist: Arc<std::sync::Mutex<HashMap<String, Instant>>>,
/// Per-deployment local call ledger used by `next_script_id` /
/// `next_script_ids` to avoid selecting an already saturated deployment
/// while another configured script still has locally-observed capacity.
/// Entries are pruned on selection against a rolling 24-hour window.
script_quota_ledger: Arc<std::sync::Mutex<HashMap<String, VecDeque<Instant>>>>,
/// Per-deployment rolling timeout counter. Maps `script_id` →
/// `(window_start, strike_count)`. Reset when the window expires
/// or when a batch succeeds. Triggers a short-cooldown blacklist
Expand Down Expand Up @@ -620,6 +631,7 @@ impl DomainFronter {
inflight: Arc::new(Mutex::new(HashMap::new())),
coalesced: AtomicU64::new(0),
blacklist: Arc::new(std::sync::Mutex::new(HashMap::new())),
script_quota_ledger: Arc::new(std::sync::Mutex::new(HashMap::new())),
script_timeouts: Arc::new(std::sync::Mutex::new(HashMap::new())),
relay_calls: AtomicU64::new(0),
relay_failures: AtomicU64::new(0),
Expand Down Expand Up @@ -802,21 +814,39 @@ impl DomainFronter {
let mut bl = self.blacklist.lock().unwrap();
let now = Instant::now();
bl.retain(|_, until| *until > now);
let mut quota = self.script_quota_ledger.lock().unwrap();
prune_script_quota_ledger(&mut quota, now);

let mut saturated_fallback: Option<String> = None;
for _ in 0..n {
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
let sid = &self.script_ids[idx % n];
if !bl.contains_key(sid) {
return sid.clone();
if script_has_local_quota_capacity(&quota, sid) {
record_script_quota_call_locked(&mut quota, sid, now);
return sid.clone();
}
saturated_fallback.get_or_insert_with(|| sid.clone());
}
}
// If every non-blacklisted deployment is locally saturated, preserve
// connectivity instead of hard-failing. Paid Workspace quotas and
// traffic from other clients are not visible to this process, so this
// ledger is a steering signal, not an authoritative quota gate.
if let Some(sid) = saturated_fallback {
record_script_quota_call_locked(&mut quota, &sid, now);
return sid;
}
// All blacklisted: pick whichever comes off cooldown soonest.
if let Some((sid, _)) = bl.iter().min_by_key(|(_, t)| **t) {
let sid = sid.clone();
bl.remove(&sid);
record_script_quota_call_locked(&mut quota, &sid, now);
return sid;
}
self.script_ids[0].clone()
let sid = self.script_ids[0].clone();
record_script_quota_call_locked(&mut quota, &sid, now);
sid
}

/// Pick `want` distinct non-blacklisted script IDs for a parallel fan-out
Expand All @@ -831,20 +861,34 @@ impl DomainFronter {
let mut bl = self.blacklist.lock().unwrap();
let now = Instant::now();
bl.retain(|_, until| *until > now);
let mut quota = self.script_quota_ledger.lock().unwrap();
prune_script_quota_ledger(&mut quota, now);

let mut picked: Vec<String> = Vec::with_capacity(want);
let mut saturated_fallback: Vec<String> = Vec::with_capacity(want);
for _ in 0..n {
if picked.len() >= want {
break;
}
let idx = self.script_idx.fetch_add(1, Ordering::Relaxed);
let sid = &self.script_ids[idx % n];
if !bl.contains_key(sid) && !picked.iter().any(|p| p == sid) {
picked.push(sid.clone());
if script_has_local_quota_capacity(&quota, sid) {
picked.push(sid.clone());
} else if !saturated_fallback.iter().any(|p| p == sid) {
saturated_fallback.push(sid.clone());
}
}
}
if picked.is_empty() {
picked.push(self.script_ids[0].clone());
if let Some(sid) = saturated_fallback.into_iter().next() {
picked.push(sid);
} else {
picked.push(self.script_ids[0].clone());
}
}
for sid in &picked {
record_script_quota_call_locked(&mut quota, sid, now);
}
picked
}
Expand Down Expand Up @@ -3879,6 +3923,40 @@ fn add_random_pad(map: &mut serde_json::Map<String, Value>) {
map.insert("_pad".into(), Value::String(B64.encode(&buf)));
}

fn prune_script_quota_ledger(
ledger: &mut HashMap<String, VecDeque<Instant>>,
now: Instant,
) {
let cutoff = now.checked_sub(SCRIPT_QUOTA_WINDOW).unwrap_or(now);
ledger.retain(|_, calls| {
while calls.front().map(|ts| *ts <= cutoff).unwrap_or(false) {
calls.pop_front();
}
!calls.is_empty()
});
}

fn script_has_local_quota_capacity(
ledger: &HashMap<String, VecDeque<Instant>>,
script_id: &str,
) -> bool {
ledger
.get(script_id)
.map(|calls| calls.len() < SCRIPT_QUOTA_FREE_TIER_CALLS)
.unwrap_or(true)
}

fn record_script_quota_call_locked(
ledger: &mut HashMap<String, VecDeque<Instant>>,
script_id: &str,
now: Instant,
) {
ledger
.entry(script_id.to_string())
.or_default()
.push_back(now);
}

/// "YYYY-MM-DD" of the current Pacific Time date. Used as the daily-reset
/// boundary for `today_calls` / `today_bytes` because **Apps Script's
/// quota counter resets at midnight Pacific Time, not UTC** — that's
Expand Down Expand Up @@ -6606,6 +6684,110 @@ hello";
DomainFronter::new(&cfg).expect("test fronter must construct")
}

fn fronter_for_script_ids(script_ids: &[&str]) -> DomainFronter {
let script_ids_json = serde_json::to_string(script_ids).unwrap();
let json = format!(
r#"{{
"mode": "apps_script",
"google_ip": "127.0.0.1",
"front_domain": "www.google.com",
"script_id": {},
"auth_key": "test_auth_key",
"listen_host": "127.0.0.1",
"listen_port": 8085,
"log_level": "info",
"verify_ssl": true
}}"#,
script_ids_json
);
let cfg: Config = serde_json::from_str(&json).unwrap();
DomainFronter::new(&cfg).expect("test fronter must construct")
}

fn seed_script_quota(fronter: &DomainFronter, script_id: &str, count: usize, at: Instant) {
let mut ledger = fronter.script_quota_ledger.lock().unwrap();
ledger
.entry(script_id.to_string())
.or_default()
.extend(std::iter::repeat(at).take(count));
}

#[test]
fn next_script_id_skips_locally_saturated_deployment() {
let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B"]);
seed_script_quota(
&fronter,
"SCRIPT_A",
SCRIPT_QUOTA_FREE_TIER_CALLS,
Instant::now(),
);

let selected = fronter.next_script_id();

assert_eq!(selected, "SCRIPT_B");
let ledger = fronter.script_quota_ledger.lock().unwrap();
assert_eq!(
ledger.get("SCRIPT_B").map(|calls| calls.len()),
Some(1),
"selection must be recorded in the local rolling ledger"
);
}

#[test]
fn script_quota_prune_removes_expired_observations() {
let recorded_at = Instant::now();
let prune_at = recorded_at
.checked_add(SCRIPT_QUOTA_WINDOW + Duration::from_secs(1))
.expect("test clock must support a 24h monotonic addition");
let mut ledger = HashMap::new();
ledger.insert(
"SCRIPT_A".to_string(),
std::iter::repeat(recorded_at)
.take(SCRIPT_QUOTA_FREE_TIER_CALLS)
.collect::<VecDeque<_>>(),
);

prune_script_quota_ledger(&mut ledger, prune_at);

assert!(
ledger.is_empty(),
"rolling quota ledger should discard observations outside the 24h window"
);
}

#[test]
fn next_script_id_preserves_connectivity_when_all_scripts_are_locally_saturated() {
let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B"]);
let now = Instant::now();
seed_script_quota(&fronter, "SCRIPT_A", SCRIPT_QUOTA_FREE_TIER_CALLS, now);
seed_script_quota(&fronter, "SCRIPT_B", SCRIPT_QUOTA_FREE_TIER_CALLS, now);

let selected = fronter.next_script_id();

assert_eq!(selected, "SCRIPT_A");
let ledger = fronter.script_quota_ledger.lock().unwrap();
assert_eq!(
ledger.get("SCRIPT_A").map(|calls| calls.len()),
Some(SCRIPT_QUOTA_FREE_TIER_CALLS + 1),
"local saturation is a steering signal, not a hard outage trigger"
);
}

#[test]
fn parallel_script_selection_prefers_unsaturated_deployments() {
let fronter = fronter_for_script_ids(&["SCRIPT_A", "SCRIPT_B", "SCRIPT_C"]);
seed_script_quota(
&fronter,
"SCRIPT_A",
SCRIPT_QUOTA_FREE_TIER_CALLS,
Instant::now(),
);

let selected = fronter.next_script_ids(2);

assert_eq!(selected, vec!["SCRIPT_B".to_string(), "SCRIPT_C".to_string()]);
}

#[tokio::test(flavor = "current_thread")]
async fn force_http1_disables_h2_at_construction() {
// The kill switch: force_http1=true must mark the fronter as
Expand Down
Loading