From f09dba772cdcc5cb7705a816db8d83917a65d50e Mon Sep 17 00:00:00 2001
From: Jeromy <jeromyj@gmail.com>
Date: Mon, 11 Aug 2014 20:11:23 -0700
Subject: [PATCH] more tests and add in table filtering by peer latency

---
 peer/peer.go                  |  2 +
 routing/dht/dht.go            | 49 +++++++++++++++++---
 routing/dht/ext_test.go       | 39 ++++++++++++++--
 routing/dht/routing.go        |  8 ++--
 routing/kbucket/bucket.go     | 54 ++++++++++++++--------
 routing/kbucket/table.go      | 30 +++++++++---
 routing/kbucket/table_test.go | 87 ++++++++++++++++++++++++++++++++---
 7 files changed, 223 insertions(+), 46 deletions(-)

diff --git a/peer/peer.go b/peer/peer.go
index a54179b3b..144189f58 100644
--- a/peer/peer.go
+++ b/peer/peer.go
@@ -71,6 +71,8 @@ func (p *Peer) GetLatency() (out time.Duration) {
 	return
 }
 
+// TODO: Instead of just keeping a single number,
+//		 keep a running average over the last hour or so
 func (p *Peer) SetLatency(laten time.Duration) {
 	p.latenLock.Lock()
 	p.latency = laten
diff --git a/routing/dht/dht.go b/routing/dht/dht.go
index e62b1fda8..018c22a01 100644
--- a/routing/dht/dht.go
+++ b/routing/dht/dht.go
@@ -74,8 +74,12 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
 	dht.listeners = make(map[uint64]*listenInfo)
 	dht.providers = make(map[u.Key][]*providerInfo)
 	dht.shutdown = make(chan struct{})
-	dht.routes = make([]*kb.RoutingTable, 1)
-	dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID))
+
+	dht.routes = make([]*kb.RoutingTable, 3)
+	dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
+	dht.routes[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
+	dht.routes[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
+
 	dht.birth = time.Now()
 	return dht
 }
@@ -253,7 +257,13 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
 			// No providers?
 			// Find closest peer on given cluster to desired key and reply with that info
 
-			level := pmes.GetValue()[0] // Using value field to specify cluster level
+			level := 0
+			if len(pmes.GetValue()) < 1 {
+				// TODO: maybe return an error? Defaulting isnt a good idea IMO
+				u.PErr("handleGetValue: no routing level specified, assuming 0")
+			} else {
+				level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
+			}
 
 			closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
 
@@ -477,6 +487,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
 	response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
 
 	mes := swarm.NewMessage(p, pmes.ToProtobuf())
+	t := time.Now()
 	dht.network.Send(mes)
 
 	// Wait for either the response or a timeout
@@ -490,6 +501,8 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
 			u.PErr("response channel closed before timeout, please investigate.")
 			return nil, u.ErrTimeout
 		}
+		roundtrip := time.Since(t)
+		resp.Peer.SetLatency(roundtrip)
 		pmes_out := new(PBDHTMessage)
 		err := proto.Unmarshal(resp.Data, pmes_out)
 		if err != nil {
@@ -513,7 +526,8 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
 				u.PErr("getValue error: %s", err)
 				continue
 			}
-			p, err = dht.Connect(maddr)
+
+			p, err = dht.network.Connect(maddr)
 			if err != nil {
 				u.PErr("getValue error: %s", err)
 				continue
@@ -547,9 +561,21 @@ func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
 }
 
 func (dht *IpfsDHT) Update(p *peer.Peer) {
-	removed := dht.routes[0].Update(p)
-	if removed != nil {
-		dht.network.Drop(removed)
+	for _, route := range dht.routes {
+		removed := route.Update(p)
+		// Only drop the connection if no tables refer to this peer
+		if removed != nil {
+			found := false
+			for _, r := range dht.routes {
+				if r.Find(removed.ID) != nil {
+					found = true
+					break
+				}
+			}
+			if !found {
+				dht.network.Drop(removed)
+			}
+		}
 	}
 }
 
@@ -574,6 +600,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
 
 	mes := swarm.NewMessage(p, pmes.ToProtobuf())
 	listenChan := dht.ListenFor(pmes.Id, 1, time.Minute)
+	t := time.Now()
 	dht.network.Send(mes)
 	after := time.After(timeout)
 	select {
@@ -581,6 +608,8 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
 		dht.Unlisten(pmes.Id)
 		return nil, u.ErrTimeout
 	case resp := <-listenChan:
+		roundtrip := time.Since(t)
+		resp.Peer.SetLatency(roundtrip)
 		pmes_out := new(PBDHTMessage)
 		err := proto.Unmarshal(resp.Data, pmes_out)
 		if err != nil {
@@ -590,3 +619,9 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
 		return pmes_out, nil
 	}
 }
+
+func (dht *IpfsDHT) PrintTables() {
+	for _, route := range dht.routes {
+		route.Print()
+	}
+}
diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go
index 4dff36886..fbf52a263 100644
--- a/routing/dht/ext_test.go
+++ b/routing/dht/ext_test.go
@@ -16,13 +16,16 @@ import (
 // fauxNet is a standin for a swarm.Network in order to more easily recreate
 // different testing scenarios
 type fauxNet struct {
-	Chan *swarm.Chan
+	Chan     *swarm.Chan
+	handlers []mesHandleFunc
 
 	swarm.Network
-
-	handlers []mesHandleFunc
 }
 
+// mesHandleFunc is a function that takes in outgoing messages
+// and can respond to them, simulating other peers on the network.
+// returning nil will chose not to respond and pass the message onto the
+// next registered handler
 type mesHandleFunc func(*swarm.Message) *swarm.Message
 
 func newFauxNet() *fauxNet {
@@ -32,6 +35,9 @@ func newFauxNet() *fauxNet {
 	return fn
 }
 
+// Instead of 'Listening' Start up a goroutine that will check
+// all outgoing messages against registered message handlers,
+// and reply if needed
 func (f *fauxNet) Listen() error {
 	go func() {
 		for {
@@ -95,6 +101,7 @@ func TestGetFailures(t *testing.T) {
 		t.Fatal("Did not get expected error!")
 	}
 
+	// Reply with failures to every message
 	fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
 		pmes := new(PBDHTMessage)
 		err := proto.Unmarshal(mes.Data, pmes)
@@ -120,4 +127,30 @@ func TestGetFailures(t *testing.T) {
 	} else {
 		t.Fatal("expected error, got none.")
 	}
+
+	success := make(chan struct{})
+	fn.handlers = nil
+	fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
+		resp := new(PBDHTMessage)
+		err := proto.Unmarshal(mes.Data, resp)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if resp.GetSuccess() {
+			t.Fatal("Get returned success when it shouldnt have.")
+		}
+		success <- struct{}{}
+		return nil
+	})
+
+	// Now we test this DHT's handleGetValue failure
+	req := DHTMessage{
+		Type:  PBDHTMessage_GET_VALUE,
+		Key:   "hello",
+		Id:    GenerateMessageID(),
+		Value: []byte{0},
+	}
+	fn.Chan.Incoming <- swarm.NewMessage(other, req.ToProtobuf())
+
+	<-success
 }
diff --git a/routing/dht/routing.go b/routing/dht/routing.go
index 711866f8e..71e950102 100644
--- a/routing/dht/routing.go
+++ b/routing/dht/routing.go
@@ -89,9 +89,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
 					panic("not yet implemented")
 				}
 
-				// TODO: dht.Connect has overhead due to an internal
-				//			ping to the target. Use something else
-				p, err = s.Connect(maddr)
+				p, err = s.network.Connect(maddr)
 				if err != nil {
 					// Move up route level
 					panic("not yet implemented.")
@@ -167,7 +165,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
 					u.PErr("error connecting to new peer: %s", err)
 					continue
 				}
-				p, err = s.Connect(maddr)
+				p, err = s.network.Connect(maddr)
 				if err != nil {
 					u.PErr("error connecting to new peer: %s", err)
 					continue
@@ -204,7 +202,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
 			return nil, u.WrapError(err, "FindPeer received bad info")
 		}
 
-		nxtPeer, err := s.Connect(addr)
+		nxtPeer, err := s.network.Connect(addr)
 		if err != nil {
 			return nil, u.WrapError(err, "FindPeer failed to connect to new peer.")
 		}
diff --git a/routing/kbucket/bucket.go b/routing/kbucket/bucket.go
index 5abd2c910..1a55a0f69 100644
--- a/routing/kbucket/bucket.go
+++ b/routing/kbucket/bucket.go
@@ -2,16 +2,27 @@ package dht
 
 import (
 	"container/list"
+	"sync"
 
 	peer "github.com/jbenet/go-ipfs/peer"
 )
 
 // Bucket holds a list of peers.
-type Bucket list.List
+type Bucket struct {
+	lk   sync.RWMutex
+	list *list.List
+}
+
+func NewBucket() *Bucket {
+	b := new(Bucket)
+	b.list = list.New()
+	return b
+}
 
 func (b *Bucket) Find(id peer.ID) *list.Element {
-	bucket_list := (*list.List)(b)
-	for e := bucket_list.Front(); e != nil; e = e.Next() {
+	b.lk.RLock()
+	defer b.lk.RUnlock()
+	for e := b.list.Front(); e != nil; e = e.Next() {
 		if e.Value.(*peer.Peer).ID.Equal(id) {
 			return e
 		}
@@ -20,34 +31,42 @@ func (b *Bucket) Find(id peer.ID) *list.Element {
 }
 
 func (b *Bucket) MoveToFront(e *list.Element) {
-	bucket_list := (*list.List)(b)
-	bucket_list.MoveToFront(e)
+	b.lk.Lock()
+	b.list.MoveToFront(e)
+	b.lk.Unlock()
 }
 
 func (b *Bucket) PushFront(p *peer.Peer) {
-	bucket_list := (*list.List)(b)
-	bucket_list.PushFront(p)
+	b.lk.Lock()
+	b.list.PushFront(p)
+	b.lk.Unlock()
 }
 
 func (b *Bucket) PopBack() *peer.Peer {
-	bucket_list := (*list.List)(b)
-	last := bucket_list.Back()
-	bucket_list.Remove(last)
+	b.lk.Lock()
+	defer b.lk.Unlock()
+	last := b.list.Back()
+	b.list.Remove(last)
 	return last.Value.(*peer.Peer)
 }
 
 func (b *Bucket) Len() int {
-	bucket_list := (*list.List)(b)
-	return bucket_list.Len()
+	b.lk.RLock()
+	defer b.lk.RUnlock()
+	return b.list.Len()
 }
 
 // Splits a buckets peers into two buckets, the methods receiver will have
 // peers with CPL equal to cpl, the returned bucket will have peers with CPL
 // greater than cpl (returned bucket has closer peers)
 func (b *Bucket) Split(cpl int, target ID) *Bucket {
-	bucket_list := (*list.List)(b)
+	b.lk.Lock()
+	defer b.lk.Unlock()
+
 	out := list.New()
-	e := bucket_list.Front()
+	newbuck := NewBucket()
+	newbuck.list = out
+	e := b.list.Front()
 	for e != nil {
 		peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID)
 		peer_cpl := prefLen(peer_id, target)
@@ -55,15 +74,14 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
 			cur := e
 			out.PushBack(e.Value)
 			e = e.Next()
-			bucket_list.Remove(cur)
+			b.list.Remove(cur)
 			continue
 		}
 		e = e.Next()
 	}
-	return (*Bucket)(out)
+	return newbuck
 }
 
 func (b *Bucket) getIter() *list.Element {
-	bucket_list := (*list.List)(b)
-	return bucket_list.Front()
+	return b.list.Front()
 }
diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go
index 788d12265..86a7031ce 100644
--- a/routing/kbucket/table.go
+++ b/routing/kbucket/table.go
@@ -2,8 +2,10 @@ package dht
 
 import (
 	"container/list"
+	"fmt"
 	"sort"
 	"sync"
+	"time"
 
 	peer "github.com/jbenet/go-ipfs/peer"
 	u "github.com/jbenet/go-ipfs/util"
@@ -18,16 +20,20 @@ type RoutingTable struct {
 	// Blanket lock, refine later for better performance
 	tabLock sync.RWMutex
 
+	// Maximum acceptable latency for peers in this cluster
+	maxLatency time.Duration
+
 	// kBuckets define all the fingers to other nodes.
 	Buckets    []*Bucket
 	bucketsize int
 }
 
-func NewRoutingTable(bucketsize int, local_id ID) *RoutingTable {
+func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *RoutingTable {
 	rt := new(RoutingTable)
-	rt.Buckets = []*Bucket{new(Bucket)}
+	rt.Buckets = []*Bucket{NewBucket()}
 	rt.bucketsize = bucketsize
 	rt.local = local_id
+	rt.maxLatency = latency
 	return rt
 }
 
@@ -48,6 +54,10 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
 	e := bucket.Find(p.ID)
 	if e == nil {
 		// New peer, add to bucket
+		if p.GetLatency() > rt.maxLatency {
+			// Connection doesnt meet requirements, skip!
+			return nil
+		}
 		bucket.PushFront(p)
 
 		// Are we past the max bucket size?
@@ -150,17 +160,16 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
 		// In the case of an unusual split, one bucket may be empty.
 		// if this happens, search both surrounding buckets for nearest peer
 		if cpl > 0 {
-			plist := (*list.List)(rt.Buckets[cpl-1])
+			plist := rt.Buckets[cpl-1].list
 			peerArr = copyPeersFromList(id, peerArr, plist)
 		}
 
 		if cpl < len(rt.Buckets)-1 {
-			plist := (*list.List)(rt.Buckets[cpl+1])
+			plist := rt.Buckets[cpl+1].list
 			peerArr = copyPeersFromList(id, peerArr, plist)
 		}
 	} else {
-		plist := (*list.List)(bucket)
-		peerArr = copyPeersFromList(id, peerArr, plist)
+		peerArr = copyPeersFromList(id, peerArr, bucket.list)
 	}
 
 	// Sort by distance to local peer
@@ -193,3 +202,12 @@ func (rt *RoutingTable) Listpeers() []*peer.Peer {
 	}
 	return peers
 }
+
+func (rt *RoutingTable) Print() {
+	fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
+	rt.tabLock.RLock()
+	peers := rt.Listpeers()
+	for i, p := range peers {
+		fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String())
+	}
+}
diff --git a/routing/kbucket/table_test.go b/routing/kbucket/table_test.go
index 842c92510..02d8f5e0e 100644
--- a/routing/kbucket/table_test.go
+++ b/routing/kbucket/table_test.go
@@ -1,11 +1,11 @@
 package dht
 
 import (
-	"container/list"
 	crand "crypto/rand"
 	"crypto/sha256"
 	"math/rand"
 	"testing"
+	"time"
 
 	peer "github.com/jbenet/go-ipfs/peer"
 )
@@ -27,7 +27,7 @@ func _randID() ID {
 
 // Test basic features of the bucket struct
 func TestBucket(t *testing.T) {
-	b := new(Bucket)
+	b := NewBucket()
 
 	peers := make([]*peer.Peer, 100)
 	for i := 0; i < 100; i++ {
@@ -45,7 +45,7 @@ func TestBucket(t *testing.T) {
 	}
 
 	spl := b.Split(0, ConvertPeerID(local.ID))
-	llist := (*list.List)(b)
+	llist := b.list
 	for e := llist.Front(); e != nil; e = e.Next() {
 		p := ConvertPeerID(e.Value.(*peer.Peer).ID)
 		cpl := xor(p, local_id).commonPrefixLen()
@@ -54,7 +54,7 @@ func TestBucket(t *testing.T) {
 		}
 	}
 
-	rlist := (*list.List)(spl)
+	rlist := spl.list
 	for e := rlist.Front(); e != nil; e = e.Next() {
 		p := ConvertPeerID(e.Value.(*peer.Peer).ID)
 		cpl := xor(p, local_id).commonPrefixLen()
@@ -67,7 +67,7 @@ func TestBucket(t *testing.T) {
 // Right now, this just makes sure that it doesnt hang or crash
 func TestTableUpdate(t *testing.T) {
 	local := _randPeer()
-	rt := NewRoutingTable(10, ConvertPeerID(local.ID))
+	rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
 
 	peers := make([]*peer.Peer, 100)
 	for i := 0; i < 100; i++ {
@@ -93,7 +93,7 @@ func TestTableUpdate(t *testing.T) {
 
 func TestTableFind(t *testing.T) {
 	local := _randPeer()
-	rt := NewRoutingTable(10, ConvertPeerID(local.ID))
+	rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
 
 	peers := make([]*peer.Peer, 100)
 	for i := 0; i < 5; i++ {
@@ -110,7 +110,7 @@ func TestTableFind(t *testing.T) {
 
 func TestTableFindMultiple(t *testing.T) {
 	local := _randPeer()
-	rt := NewRoutingTable(20, ConvertPeerID(local.ID))
+	rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour)
 
 	peers := make([]*peer.Peer, 100)
 	for i := 0; i < 18; i++ {
@@ -124,3 +124,76 @@ func TestTableFindMultiple(t *testing.T) {
 		t.Fatalf("Got back different number of peers than we expected.")
 	}
 }
+
+// Looks for race conditions in table operations. For a more 'certain'
+// test, increase the loop counter from 1000 to a much higher number
+// and set GOMAXPROCS above 1
+func TestTableMultithreaded(t *testing.T) {
+	local := peer.ID("localPeer")
+	tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour)
+	var peers []*peer.Peer
+	for i := 0; i < 500; i++ {
+		peers = append(peers, _randPeer())
+	}
+
+	done := make(chan struct{})
+	go func() {
+		for i := 0; i < 1000; i++ {
+			n := rand.Intn(len(peers))
+			tab.Update(peers[n])
+		}
+		done <- struct{}{}
+	}()
+
+	go func() {
+		for i := 0; i < 1000; i++ {
+			n := rand.Intn(len(peers))
+			tab.Update(peers[n])
+		}
+		done <- struct{}{}
+	}()
+
+	go func() {
+		for i := 0; i < 1000; i++ {
+			n := rand.Intn(len(peers))
+			tab.Find(peers[n].ID)
+		}
+		done <- struct{}{}
+	}()
+	<-done
+	<-done
+	<-done
+}
+
+func BenchmarkUpdates(b *testing.B) {
+	b.StopTimer()
+	local := ConvertKey("localKey")
+	tab := NewRoutingTable(20, local, time.Hour)
+
+	var peers []*peer.Peer
+	for i := 0; i < b.N; i++ {
+		peers = append(peers, _randPeer())
+	}
+
+	b.StartTimer()
+	for i := 0; i < b.N; i++ {
+		tab.Update(peers[i])
+	}
+}
+
+func BenchmarkFinds(b *testing.B) {
+	b.StopTimer()
+	local := ConvertKey("localKey")
+	tab := NewRoutingTable(20, local, time.Hour)
+
+	var peers []*peer.Peer
+	for i := 0; i < b.N; i++ {
+		peers = append(peers, _randPeer())
+		tab.Update(peers[i])
+	}
+
+	b.StartTimer()
+	for i := 0; i < b.N; i++ {
+		tab.Find(peers[i].ID)
+	}
+}
-- 
GitLab