From 55708a2386e212ae2ad98dc7383bd2515202b88c Mon Sep 17 00:00:00 2001 From: Michael Meier Date: Tue, 26 Jan 2010 22:39:58 +0100 Subject: [PATCH] lots of improvement in find --- lib/find.go | 129 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 120 insertions(+), 9 deletions(-) diff --git a/lib/find.go b/lib/find.go index a016ddc..94f0f94 100644 --- a/lib/find.go +++ b/lib/find.go @@ -1,6 +1,10 @@ package malus +import ( + "fmt" + "net" + ) type robotReturn struct { rthost *RTHost @@ -9,13 +13,90 @@ type robotReturn struct { } -func robot(rh *RTHost, retchan chan *robotReturn) { +func robotParse(oid string, t string, retis []interface{}) (closest *RTHostList, retvals []interface{}) { + + closest = nil + retvals = nil + + if len(retis) != 1 { + fmt.Printf("find: len(retis) != 1 ?\n") + return + } + + + // TODO: disambiguate between direct findnode return values + // and findvalue style mapped retval/closest return values + wireclosest, ok := retis[0].([]interface{}) + if !ok { + fmt.Printf("wire closest is not []interface{}") + } + + closest = NewRTHostList() + + for _, wci := range wireclosest { + wci, ok := wci.([]interface{}) + if !ok { + continue + } + + if len(wci) != 3 { + continue + } + + host, ok := wci[0].(string) + if !ok { + continue + } + + port64, ok := wci[0].(int64) + if !ok { + continue + } + var port int = int(uint16(port64)) + + id, ok := wci[0].(string) + if !ok { + continue + } + if len(id) != HASHLEN { + continue + } + + addrstring := fmt.Sprintf("%s:%d", host, port) + addr, err := net.ResolveUDPAddr(addrstring) + if err != nil { + continue + } + + rth := new(RTHost) + rth.Host.Addr = addr + rth.Host.Id = id + rth.Distance = XOR(oid, id) + closest.Push(rth) + } + + return +} + + +func robot(oid string, t string, rh *RTHost, retchan chan *robotReturn, cm *CallManager) { ret := new(robotReturn) + args := []interface{}{t} + retis, err := cm.Call(rh.Host.Addr, "findnode", args) + ret.rthost = rh - ret.closest = nil - ret.retvals = nil + if err != nil { + ret.closest = nil + ret.retvals = nil + } else { + fmt.Printf("robot parsing\n") + ret.closest, ret.retvals = robotParse(oid, t, retis) + fmt.Printf("robot parsing done\n") + } + fmt.Printf("robot done <=\n") + retchan <- ret } @@ -34,30 +115,53 @@ func find(t string, cm *CallManager, rt RoutingTable, bootstrap *RTHostList) { } kclosest.Sort() + visited := make(map[string]*RTHost) closestd := kclosest.At(0).Distance //MaxDistance alpha := Alpha - retchan := make(chan *robotReturn) // dummy + retchan := make(chan *robotReturn) nrunning := 0 nqueried := 0 converging := true for converging { + fmt.Printf("find: convering round\n") for (nrunning < alpha) && (kclosest.Len() > 0) { + fmt.Printf("=> robot w/ %d left\n", kclosest.Len()) rh := kclosest.PopFront() - go robot(rh, retchan) + straddr := rh.Host.Addr.String() + if _, ok := visited[straddr]; ok { + // host already visited + continue + } + go robot(cm.Id, t, rh, retchan, cm) + visited[straddr] = rh nrunning++ } + if nrunning == 0 { + break + } + ret := <-retchan nrunning-- nqueried++ if ret.closest == nil { - panicln("handle this more intelligently...") + //panicln("handle this more intelligently...") + continue } - kclosest.Append(ret.closest) + //fmt.Printf("find: append\n") + //kclosest.Append(ret.closest) + for i := 0; i < ret.closest.Len(); i++ { + el := ret.closest.At(i) + addrstr := el.Host.Addr.String() + if _, ok := visited[addrstr]; !ok { + kclosest.Push(el) + } + } + //fmt.Printf("find: sort\n") kclosest.Sort() l := kclosest.Len() if l >= K { @@ -76,12 +180,19 @@ func find(t string, cm *CallManager, rt RoutingTable, bootstrap *RTHostList) { } } + // TODO: drain channel + + fmt.Printf("find: not converging any more\n") // the search is not converging any more. now make sure all k // nodes are queried - for (nqueried < K) || (nrunning > 0) { + /*for (nqueried < K) || (nrunning > 0) { // if we can spawn more goroutines => spawn them // read results similar to code above - } + }*/ + + // TODO: as above, either drain channel or make channel + // buffered, so robots actually terminate and channel gets + // garbitsch collected }