diff --git a/Cargo.lock b/Cargo.lock index c1f12953..2846e5a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2020,6 +2020,7 @@ dependencies = [ "iceberg-catalog-memory", "once_cell", "ordered-float 5.0.0", + "rand 0.8.5", "tempfile", "tokio", "trait-variant", diff --git a/optd/Cargo.toml b/optd/Cargo.toml index 275a5ef4..b06faf6c 100644 --- a/optd/Cargo.toml +++ b/optd/Cargo.toml @@ -28,3 +28,4 @@ tokio = { version = "1.45.0", features = [ "rt-multi-thread", "test-util" ] } +rand = "0.8.5" diff --git a/optd/src/memo/fuzz.rs b/optd/src/memo/fuzz.rs new file mode 100644 index 00000000..4454959e --- /dev/null +++ b/optd/src/memo/fuzz.rs @@ -0,0 +1,259 @@ +use crate::cir::{Child, GroupId, LogicalExpression, LogicalProperties, OperatorData}; +use crate::memo::Memo; +use rand::distributions::WeightedIndex; +use rand::prelude::*; +use rand::rngs::StdRng; +use std::cmp::max; +use std::collections::HashSet; + +#[derive(Clone, Debug)] +pub(crate) struct FuzzExpression { + pub op: usize, + pub children: Vec, +} + +#[derive(Clone, Debug)] +pub(crate) struct FuzzGroup { + pub exprs: Vec, + pub id: usize, +} + +#[derive(Clone, Debug)] +pub(crate) struct FuzzData { + pub exprs: Vec, + pub groups: Vec, + pub entry: usize, +} + +impl FuzzData { + pub fn new(ngroups: usize, nexprs: usize, dag: bool, seed: u64) -> Self { + let mut rng = StdRng::seed_from_u64(seed); + + // FIXME: MAGIC NUMBERS + let weights = [10, 30, 30]; // distribution operator arity + let proximity = 4; // proximity factor (1 for no proximity preference) + + let mut memo = FuzzData { + exprs: Vec::new(), + groups: Vec::new(), + entry: 0, + }; + + let mut tot = 0; + let mut cnt = 0; + for (i, v) in weights.iter().enumerate() { + tot += i * v; + cnt += v; + } + let dist = WeightedIndex::new(&weights).unwrap(); + + // Generate groups + let mut gqueue: Vec = vec![]; + while memo.groups.len() < ngroups || gqueue.len() > 1 { + let mut exprs: Vec = vec![]; + + let ngen = rng.gen_range(0..nexprs * 2); + + // Generate expressions even if no groups to reference (will be a scan!) + while exprs.len() < ngen { + let arity = dist.sample(&mut rng); + let mut children: Vec = vec![]; + let mut cset: HashSet = HashSet::new(); + for i in 0..arity { + if gqueue.len() > 0 { + let idx = rng.gen_range(0..gqueue.len()); + // avoid using the same group twice + // as an operand to the same expression + let c = gqueue[idx]; + if !cset.contains(&c) { + cset.insert(c); + children.push(gqueue[idx]); + gqueue.remove(idx); + } + } + if children.len() <= i { + // failed to find a group, make one extra 'scan' node now + children.push(memo.groups.len()); + memo.groups.push(FuzzGroup { + exprs: vec![memo.exprs.len()], + id: memo.groups.len(), + }); + memo.exprs.push(FuzzExpression { + op: 0, + children: vec![], + }); + } + } + let expr_id = memo.exprs.len(); + exprs.push(expr_id); + + let op = children.len(); // FIXME: more ops? + memo.exprs.push(FuzzExpression { op, children }); + } + + if ngen > 0 { + let group_id = memo.groups.len(); + memo.groups.push(FuzzGroup { + exprs, + id: group_id, + }); + + // While we don't have enough groups, collect operands for future expressions + if group_id < ngroups && dag { + // replenish groups to be referenced + let ng = match nexprs * tot / cnt { + d if d > 0 => rng.gen_range(0..d * 2), + _ => 0, + }; + + let m = max(0, (group_id as i32) - (ngroups as i32) / proximity) as usize; + for _ in 0..ng { + gqueue.push(rng.gen_range(m..group_id + 1)); + } + } + + // Add at least this group, so that it is referenced or the last + gqueue.push(group_id); + } + } + + memo.entry = gqueue[0]; + + memo + } + + pub fn shuffle(&self, chunk: usize, merge: bool) -> FuzzData { + assert!(chunk > 1); + + let mut groups = self.groups.clone(); + let mut i = 0; + + while i < groups.len() { + let g = &groups[i]; + let gid = g.id; + if g.exprs.len() > chunk { + let upd = g.exprs[..chunk].to_vec(); + let rest = if merge { + // trigger merge 1,2,3,4,5,6,7 --> 1,2,3 + 4,5,6 + 7,1,4 + let mut v = g.exprs[chunk..].to_vec(); + v.push(g.exprs[0]); + v + } else { + // don't trigger merge 1,2,3,4,5,6,7 --> 1,2,3 - 3,4,5, - 5,6,7 + g.exprs[chunk - 1..].to_vec() + }; + groups[i].exprs = upd; + groups.push(FuzzGroup { + exprs: rest, + id: gid, + }) + } + i = i + 1; + } + + FuzzData { + exprs: self.exprs.clone(), + groups, + entry: self.entry, + } + } +} + +pub struct Fuzzer { + memo: M, + group_ids: Vec, + entry: GroupId, +} + +impl Fuzzer { + pub fn new(memo: M) -> Self { + Self { + memo, + group_ids: Vec::new(), + entry: GroupId(0), + } + } + + pub async fn add(&mut self, memo: &FuzzData) { + for g in memo.groups.iter() { + let mut group_id = None; + + for j in g.exprs.iter() { + let e = &memo.exprs[*j]; + + let expr = match e.op { + 0 => LogicalExpression { + tag: "Scan".to_string(), + data: vec![OperatorData::Int64(*j as i64)], + children: vec![], + }, + 1 => LogicalExpression { + tag: "Filter".to_string(), + data: vec![OperatorData::Int64(*j as i64)], + children: vec![Child::Singleton(self.group_ids[e.children[0]])], + }, + 2 => LogicalExpression { + tag: "Filter".to_string(), + data: vec![OperatorData::Int64(*j as i64)], + children: vec![ + Child::Singleton(self.group_ids[e.children[0]]), + Child::Singleton(self.group_ids[e.children[1]]), + ], + }, + _ => unreachable!(), + }; + + let eid = self.memo.get_logical_expr_id(&expr).await.unwrap(); + let gid = match self.memo.find_logical_expr_group(eid).await.unwrap() { + None => { + // new expression, create a (temporary) group + self.memo + .create_group(eid, &LogicalProperties(None)) + .await + .unwrap() + } + Some(id) => { + // expression already existed, just use its group + id + } + }; + + if let Some(id) = group_id { + // merge with known equivalent expressions + self.memo.merge_groups(gid, id).await.unwrap(); + } + group_id = Some(gid); + } + if g.id >= self.group_ids.len() { + self.group_ids.push(group_id.unwrap()); + } else { + self.group_ids[g.id] = group_id.unwrap(); + } + } + + self.entry = self.group_ids[memo.entry]; + } + + pub async fn check(&mut self, memo: &FuzzData) { + let mut _tot = 0; + for g in 0..memo.groups.len() { + let group_expressions = self + .memo + .get_all_logical_exprs(self.group_ids[g]) + .await + .unwrap(); + + // do something with it + let mut ids = vec![]; + for eid in group_expressions { + let expr = self.memo.materialize_logical_expr(eid).await.unwrap(); + if let OperatorData::Int64(v) = expr.data[0] { + ids.push(v as usize); + } + } + + ids.sort(); + assert_eq!(ids, memo.groups[g].exprs, "incorrect memo") + } + } +} diff --git a/optd/src/memo/memory/implementation.rs b/optd/src/memo/memory/implementation.rs index 3f79f2ac..9420bec4 100644 --- a/optd/src/memo/memory/implementation.rs +++ b/optd/src/memo/memory/implementation.rs @@ -302,10 +302,9 @@ impl Memo for MemoryMemo { #[cfg(test)] pub mod tests { use super::*; - use crate::{ - cir::{Child, OperatorData}, - memo::Materialize, - }; + use crate::cir::{Child, OperatorData}; + use crate::memo::Materialize; + use crate::memo::fuzz::{FuzzData, Fuzzer}; pub async fn lookup_or_insert( memo: &mut impl Memo, @@ -697,6 +696,17 @@ pub mod tests { ); } + #[tokio::test] + async fn test_logical_fuzzing() { + let data = FuzzData::new(100, 10, true, 12345); + let shuffled = data.shuffle(2, true); + + let memo = MemoryMemo::default(); + let mut fuzzer = Fuzzer::new(memo); + fuzzer.add(&shuffled).await; + fuzzer.check(&data).await; + } + #[tokio::test] async fn test_goal_merge() { let mut memo = MemoryMemo::default(); diff --git a/optd/src/memo/mod.rs b/optd/src/memo/mod.rs index fe7ba5ea..9a5fc752 100644 --- a/optd/src/memo/mod.rs +++ b/optd/src/memo/mod.rs @@ -237,3 +237,6 @@ pub trait Memo: Representative + Materialize + Sync + 'static { member_id: GoalMemberId, ) -> Result; } + +#[cfg(test)] +mod fuzz; \ No newline at end of file