From bd9fc2b782192ff37c7d9e15d3ecc90a55ba4c66 Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Wed, 6 Aug 2014 10:02:53 -0700 Subject: [PATCH] fix bug in routing table lookups --- routing/dht/dht.go | 14 +++++++++++++- routing/dht/dht_test.go | 4 ++-- routing/dht/messages.pb.go | 17 +++++++++-------- routing/dht/messages.proto | 1 + routing/dht/pDHTMessage.go | 11 ----------- routing/dht/routing.go | 13 ++++++++++++- routing/dht/table.go | 24 +++++++++++++++++++----- routing/dht/table_test.go | 17 +++++++++++++++++ swarm/swarm.go | 2 +- util/util.go | 9 +++++++-- 10 files changed, 81 insertions(+), 31 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c2c1b63b..9b4854cf 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -106,6 +106,14 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { if removed != nil { panic("need to remove this peer.") } + + // Ping new peer to register in their routing table + // NOTE: this should be done better... + err = dht.Ping(peer, time.Second * 2) + if err != nil { + panic("Failed to ping new peer.") + } + return peer, nil } @@ -149,7 +157,7 @@ func (dht *IpfsDHT) handleMessages() { } // - u.DOut("Got message type: '%s' [id = %x]", mesNames[pmes.GetType()], pmes.GetId()) + u.DOut("Got message type: '%s' [id = %x]", DHTMessage_MessageType_name[int32(pmes.GetType())], pmes.GetId()) switch pmes.GetType() { case DHTMessage_GET_VALUE: dht.handleGetValue(mes.Peer, pmes) @@ -215,14 +223,18 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) { } func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) { + u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty()) closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey()))) if closest == nil { + panic("could not find anything.") } if len(closest.Addresses) == 0 { panic("no addresses for connected peer...") } + u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty()) + addr,err := closest.Addresses[0].String() if err != nil { panic(err) diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 0c4b7ee0..6217c29f 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -45,7 +45,7 @@ func TestPing(t *testing.T) { dht_a.Start() dht_b.Start() - err = dht_a.Connect(addr_b) + _,err = dht_a.Connect(addr_b) if err != nil { t.Fatal(err) } @@ -92,7 +92,7 @@ func TestValueGetSet(t *testing.T) { dht_a.Start() dht_b.Start() - err = dht_a.Connect(addr_b) + _,err = dht_a.Connect(addr_b) if err != nil { t.Fatal(err) } diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index 3283ef4e..e95f487c 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -29,6 +29,7 @@ const ( DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3 DHTMessage_FIND_NODE DHTMessage_MessageType = 4 DHTMessage_PING DHTMessage_MessageType = 5 + DHTMessage_DIAGNOSTIC DHTMessage_MessageType = 6 ) var DHTMessage_MessageType_name = map[int32]string{ @@ -38,6 +39,7 @@ var DHTMessage_MessageType_name = map[int32]string{ 3: "GET_PROVIDERS", 4: "FIND_NODE", 5: "PING", + 6: "DIAGNOSTIC", } var DHTMessage_MessageType_value = map[string]int32{ "PUT_VALUE": 0, @@ -46,6 +48,7 @@ var DHTMessage_MessageType_value = map[string]int32{ "GET_PROVIDERS": 3, "FIND_NODE": 4, "PING": 5, + "DIAGNOSTIC": 6, } func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType { @@ -66,14 +69,12 @@ func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error { } type DHTMessage struct { - Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` - Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - // Unique ID of this message, used to match queries with responses - Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` - // Signals whether or not this message is a response to another message - Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` - XXX_unrecognized []byte `json:"-"` + Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` + Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *DHTMessage) Reset() { *m = DHTMessage{} } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index d873c755..a9a7fd3c 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -10,6 +10,7 @@ message DHTMessage { GET_PROVIDERS = 3; FIND_NODE = 4; PING = 5; + DIAGNOSTIC = 6; } required MessageType type = 1; diff --git a/routing/dht/pDHTMessage.go b/routing/dht/pDHTMessage.go index 8b862dbc..65c03b1f 100644 --- a/routing/dht/pDHTMessage.go +++ b/routing/dht/pDHTMessage.go @@ -9,17 +9,6 @@ type pDHTMessage struct { Id uint64 } -var mesNames [10]string - -func init() { - mesNames[DHTMessage_ADD_PROVIDER] = "add provider" - mesNames[DHTMessage_FIND_NODE] = "find node" - mesNames[DHTMessage_GET_PROVIDERS] = "get providers" - mesNames[DHTMessage_GET_VALUE] = "get value" - mesNames[DHTMessage_PUT_VALUE] = "put value" - mesNames[DHTMessage_PING] = "ping" -} - func (m *pDHTMessage) ToProtobuf() *DHTMessage { pmes := new(DHTMessage) if m.Value != nil { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 138a0ee9..48949835 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -194,6 +194,17 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error return nil, err } - return s.Connect(maddr) + found_peer, err := s.Connect(maddr) + if err != nil { + u.POut("Found peer but couldnt connect.") + return nil, err + } + + if !found_peer.ID.Equal(id) { + u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty()) + return found_peer, u.ErrSearchIncomplete + } + + return found_peer, nil } } diff --git a/routing/dht/table.go b/routing/dht/table.go index 17af9575..ce8fdbc2 100644 --- a/routing/dht/table.go +++ b/routing/dht/table.go @@ -1,10 +1,12 @@ package dht import ( + "encoding/hex" "container/list" "sort" peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" ) // RoutingTable defines the routing table. @@ -87,13 +89,13 @@ func (p peerSorterArr) Less(a, b int) bool { } // -func (rt *RoutingTable) copyPeersFromList(peerArr peerSorterArr, peerList *list.List) peerSorterArr { +func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { for e := peerList.Front(); e != nil; e = e.Next() { p := e.Value.(*peer.Peer) p_id := convertPeerID(p.ID) pd := peerDistance{ p: p, - distance: xor(rt.local, p_id), + distance: xor(target, p_id), } peerArr = append(peerArr, &pd) } @@ -112,6 +114,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { // Returns a list of the 'count' closest peers to the given ID func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { + u.POut("Searching table, size = %d", rt.Size()) cpl := xor(id, rt.local).commonPrefixLen() // Get bucket at cpl index or last bucket @@ -127,16 +130,16 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { // if this happens, search both surrounding buckets for nearest peer if cpl > 0 { plist := (*list.List)(rt.Buckets[cpl - 1]) - peerArr = rt.copyPeersFromList(peerArr, plist) + peerArr = copyPeersFromList(id, peerArr, plist) } if cpl < len(rt.Buckets) - 1 { plist := (*list.List)(rt.Buckets[cpl + 1]) - peerArr = rt.copyPeersFromList(peerArr, plist) + peerArr = copyPeersFromList(id, peerArr, plist) } } else { plist := (*list.List)(bucket) - peerArr = rt.copyPeersFromList(peerArr, plist) + peerArr = copyPeersFromList(id, peerArr, plist) } // Sort by distance to local peer @@ -145,7 +148,18 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { var out []*peer.Peer for i := 0; i < count && i < peerArr.Len(); i++ { out = append(out, peerArr[i].p) + u.POut("peer out: %s - %s", peerArr[i].p.ID.Pretty(), + hex.EncodeToString(xor(id, convertPeerID(peerArr[i].p.ID)))) } return out } + +// Returns the total number of peers in the routing table +func (rt *RoutingTable) Size() int { + var tot int + for _,buck := range rt.Buckets { + tot += buck.Len() + } + return tot +} diff --git a/routing/dht/table_test.go b/routing/dht/table_test.go index cb52bd1a..debec5e1 100644 --- a/routing/dht/table_test.go +++ b/routing/dht/table_test.go @@ -90,3 +90,20 @@ func TestTableUpdate(t *testing.T) { } } } + +func TestTableFind(t *testing.T) { + local := _randPeer() + rt := NewRoutingTable(10, convertPeerID(local.ID)) + + peers := make([]*peer.Peer, 100) + for i := 0; i < 5; i++ { + peers[i] = _randPeer() + rt.Update(peers[i]) + } + + t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) + found := rt.NearestPeer(convertPeerID(peers[2].ID)) + if !found.ID.Equal(peers[2].ID) { + t.Fatalf("Failed to lookup known node...") + } +} diff --git a/swarm/swarm.go b/swarm/swarm.go index 882c0a05..b286c3c8 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -245,7 +245,7 @@ func (s *Swarm) fanOut() { if !ok { return } - u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty()) + //u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty()) s.connsLock.RLock() conn, found := s.conns[msg.Peer.Key()] diff --git a/util/util.go b/util/util.go index f5eec56b..ca9ab79d 100644 --- a/util/util.go +++ b/util/util.go @@ -2,6 +2,7 @@ package util import ( "fmt" + "errors" mh "github.com/jbenet/go-multihash" "os" "os/user" @@ -13,10 +14,14 @@ import ( var Debug bool // ErrNotImplemented signifies a function has not been implemented yet. -var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.") +var ErrNotImplemented = errors.New("Error: not implemented yet.") // ErrTimeout implies that a timeout has been triggered -var ErrTimeout = fmt.Errorf("Error: Call timed out.") +var ErrTimeout = errors.New("Error: Call timed out.") + +// ErrSeErrSearchIncomplete implies that a search type operation didnt +// find the expected node, but did find 'a' node. +var ErrSearchIncomplete = errors.New("Error: Search Incomplete.") // Key is a string representation of multihash for use with maps. type Key string -- GitLab