11package  plan_fragments
22
3- import  "github.com/thanos-io/promql-engine/logicalplan" 
3+ import  (
4+ 	"encoding/binary" 
5+ 
6+ 	"github.com/google/uuid" 
7+ 	"github.com/thanos-io/promql-engine/logicalplan" 
8+ 
9+ 	"github.com/cortexproject/cortex/pkg/distributed_execution" 
10+ )
411
512// Fragmenter interface 
613type  Fragmenter  interface  {
714	// Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root 
815	// in other words, the order of the fragment in the array will be the order they are being scheduled 
9- 	Fragment (node  logicalplan.Node ) ([]Fragment , error )
16+ 	Fragment (queryID   uint64 ,  node  logicalplan.Node ) ([]Fragment , error )
1017}
1118
12- type  DummyFragmenter  struct  {
19+ func  getNewID () uint64  {
20+ 	id  :=  uuid .New ()
21+ 	return  binary .BigEndian .Uint64 (id [:8 ])
1322}
1423
15- func  (f  * DummyFragmenter ) Fragment (node  logicalplan.Node ) ([]Fragment , error ) {
16- 	// simple logic without distributed optimizer 
17- 	return  []Fragment {
18- 		{
24+ type  PlanFragmenter  struct  {
25+ }
26+ 
27+ func  (f  * PlanFragmenter ) Fragment (queryID  uint64 , node  logicalplan.Node ) ([]Fragment , error ) {
28+ 	fragments  :=  []Fragment {}
29+ 
30+ 	nodeToFragmentID  :=  make (map [* logicalplan.Node ]uint64 )
31+ 	nodeToSubtreeFragmentIDs  :=  make (map [* logicalplan.Node ][]uint64 )
32+ 
33+ 	logicalplan .TraverseBottomUp (nil , & node , func (parent , current  * logicalplan.Node ) bool  {
34+ 		childFragmentIDs  :=  make (map [uint64 ]bool )
35+ 		children  :=  (* current ).Children ()
36+ 
37+ 		for  _ , child  :=  range  children  {
38+ 			if  subtreeIDs , exists  :=  nodeToSubtreeFragmentIDs [child ]; exists  {
39+ 				for  _ , fragmentID  :=  range  subtreeIDs  {
40+ 					childFragmentIDs [fragmentID ] =  true 
41+ 				}
42+ 			}
43+ 		}
44+ 
45+ 		childIDs  :=  make ([]uint64 , 0 , len (childFragmentIDs ))
46+ 		for  fragmentID  :=  range  childFragmentIDs  {
47+ 			childIDs  =  append (childIDs , fragmentID )
48+ 		}
49+ 
50+ 		if  parent  ==  nil  { // root fragment 
51+ 			newFragment  :=  Fragment {
52+ 				Node :       * current ,
53+ 				FragmentID : getNewID (),
54+ 				ChildIDs :   childIDs ,
55+ 				IsRoot :     true ,
56+ 			}
57+ 			fragments  =  append (fragments , newFragment )
58+ 
59+ 			// cache subtree fragment IDs for this node 
60+ 			nodeToSubtreeFragmentIDs [current ] =  childIDs 
61+ 
62+ 		} else  if  distributed_execution .RemoteNode  ==  (* current ).Type () {
63+ 			remoteNode  :=  (* current ).(* distributed_execution.Remote )
64+ 			fragmentID  :=  getNewID ()
65+ 			nodeToFragmentID [current ] =  fragmentID 
66+ 
67+ 			// Set the fragment key for the remote node 
68+ 			key  :=  distributed_execution .MakeFragmentKey (queryID , fragmentID )
69+ 			remoteNode .FragmentKey  =  key 
70+ 
71+ 			newFragment  :=  Fragment {
72+ 				Node :       remoteNode .Expr ,
73+ 				FragmentID : fragmentID ,
74+ 				ChildIDs :   childIDs ,
75+ 				IsRoot :     false ,
76+ 			}
77+ 
78+ 			fragments  =  append (fragments , newFragment )
79+ 
80+ 			subtreeIDs  :=  append ([]uint64 {fragmentID }, childIDs ... )
81+ 			nodeToSubtreeFragmentIDs [current ] =  subtreeIDs 
82+ 		} else  {
83+ 			nodeToSubtreeFragmentIDs [current ] =  childIDs 
84+ 		}
85+ 
86+ 		return  false 
87+ 	})
88+ 
89+ 	if  len (fragments ) >  0  {
90+ 		return  fragments , nil 
91+ 	} else  {
92+ 		// for non-query API calls 
93+ 		// --> treat as root fragment and immediately return the result 
94+ 		return  []Fragment {{
1995			Node :       node ,
20- 			FragmentID : uint64 (1 ),
96+ 			FragmentID : uint64 (0 ),
2197			ChildIDs :   []uint64 {},
2298			IsRoot :     true ,
23- 		}, 
24- 	},  nil 
99+ 		}},  nil 
100+ 	}
25101}
26102
27103type  Fragment  struct  {
@@ -47,6 +123,6 @@ func (s *Fragment) IsEmpty() bool {
47123	return  true 
48124}
49125
50- func  NewDummyFragmenter () Fragmenter  {
51- 	return  & DummyFragmenter {}
126+ func  NewPlanFragmenter () Fragmenter  {
127+ 	return  & PlanFragmenter {}
52128}
0 commit comments