Commit f09dba77 authored by Jeromy's avatar Jeromy

more tests and add in table filtering by peer latency

parent 4cb2e1e0
......@@ -71,6 +71,8 @@ func (p *Peer) GetLatency() (out time.Duration) {
return
}
// TODO: Instead of just keeping a single number,
// keep a running average over the last hour or so
func (p *Peer) SetLatency(laten time.Duration) {
p.latenLock.Lock()
p.latency = laten
......
......@@ -74,8 +74,12 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht.listeners = make(map[uint64]*listenInfo)
dht.providers = make(map[u.Key][]*providerInfo)
dht.shutdown = make(chan struct{})
dht.routes = make([]*kb.RoutingTable, 1)
dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID))
dht.routes = make([]*kb.RoutingTable, 3)
dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
dht.routes[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
dht.routes[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.birth = time.Now()
return dht
}
......@@ -253,7 +257,13 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
// No providers?
// Find closest peer on given cluster to desired key and reply with that info
level := pmes.GetValue()[0] // Using value field to specify cluster level
level := 0
if len(pmes.GetValue()) < 1 {
// TODO: maybe return an error? Defaulting isnt a good idea IMO
u.PErr("handleGetValue: no routing level specified, assuming 0")
} else {
level = int(pmes.GetValue()[0]) // Using value field to specify cluster level
}
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
......@@ -477,6 +487,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
t := time.Now()
dht.network.Send(mes)
// Wait for either the response or a timeout
......@@ -490,6 +501,8 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
......@@ -513,7 +526,8 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
u.PErr("getValue error: %s", err)
continue
}
p, err = dht.Connect(maddr)
p, err = dht.network.Connect(maddr)
if err != nil {
u.PErr("getValue error: %s", err)
continue
......@@ -547,9 +561,21 @@ func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
}
func (dht *IpfsDHT) Update(p *peer.Peer) {
removed := dht.routes[0].Update(p)
if removed != nil {
dht.network.Drop(removed)
for _, route := range dht.routes {
removed := route.Update(p)
// Only drop the connection if no tables refer to this peer
if removed != nil {
found := false
for _, r := range dht.routes {
if r.Find(removed.ID) != nil {
found = true
break
}
}
if !found {
dht.network.Drop(removed)
}
}
}
}
......@@ -574,6 +600,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.ListenFor(pmes.Id, 1, time.Minute)
t := time.Now()
dht.network.Send(mes)
after := time.After(timeout)
select {
......@@ -581,6 +608,8 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
dht.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-listenChan:
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
......@@ -590,3 +619,9 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
return pmes_out, nil
}
}
func (dht *IpfsDHT) PrintTables() {
for _, route := range dht.routes {
route.Print()
}
}
......@@ -16,13 +16,16 @@ import (
// fauxNet is a standin for a swarm.Network in order to more easily recreate
// different testing scenarios
type fauxNet struct {
Chan *swarm.Chan
Chan *swarm.Chan
handlers []mesHandleFunc
swarm.Network
handlers []mesHandleFunc
}
// mesHandleFunc is a function that takes in outgoing messages
// and can respond to them, simulating other peers on the network.
// returning nil will chose not to respond and pass the message onto the
// next registered handler
type mesHandleFunc func(*swarm.Message) *swarm.Message
func newFauxNet() *fauxNet {
......@@ -32,6 +35,9 @@ func newFauxNet() *fauxNet {
return fn
}
// Instead of 'Listening' Start up a goroutine that will check
// all outgoing messages against registered message handlers,
// and reply if needed
func (f *fauxNet) Listen() error {
go func() {
for {
......@@ -95,6 +101,7 @@ func TestGetFailures(t *testing.T) {
t.Fatal("Did not get expected error!")
}
// Reply with failures to every message
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
pmes := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, pmes)
......@@ -120,4 +127,30 @@ func TestGetFailures(t *testing.T) {
} else {
t.Fatal("expected error, got none.")
}
success := make(chan struct{})
fn.handlers = nil
fn.AddHandler(func(mes *swarm.Message) *swarm.Message {
resp := new(PBDHTMessage)
err := proto.Unmarshal(mes.Data, resp)
if err != nil {
t.Fatal(err)
}
if resp.GetSuccess() {
t.Fatal("Get returned success when it shouldnt have.")
}
success <- struct{}{}
return nil
})
// Now we test this DHT's handleGetValue failure
req := DHTMessage{
Type: PBDHTMessage_GET_VALUE,
Key: "hello",
Id: GenerateMessageID(),
Value: []byte{0},
}
fn.Chan.Incoming <- swarm.NewMessage(other, req.ToProtobuf())
<-success
}
......@@ -89,9 +89,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
panic("not yet implemented")
}
// TODO: dht.Connect has overhead due to an internal
// ping to the target. Use something else
p, err = s.Connect(maddr)
p, err = s.network.Connect(maddr)
if err != nil {
// Move up route level
panic("not yet implemented.")
......@@ -167,7 +165,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
u.PErr("error connecting to new peer: %s", err)
continue
}
p, err = s.Connect(maddr)
p, err = s.network.Connect(maddr)
if err != nil {
u.PErr("error connecting to new peer: %s", err)
continue
......@@ -204,7 +202,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
return nil, u.WrapError(err, "FindPeer received bad info")
}
nxtPeer, err := s.Connect(addr)
nxtPeer, err := s.network.Connect(addr)
if err != nil {
return nil, u.WrapError(err, "FindPeer failed to connect to new peer.")
}
......
......@@ -2,16 +2,27 @@ package dht
import (
"container/list"
"sync"
peer "github.com/jbenet/go-ipfs/peer"
)
// Bucket holds a list of peers.
type Bucket list.List
type Bucket struct {
lk sync.RWMutex
list *list.List
}
func NewBucket() *Bucket {
b := new(Bucket)
b.list = list.New()
return b
}
func (b *Bucket) Find(id peer.ID) *list.Element {
bucket_list := (*list.List)(b)
for e := bucket_list.Front(); e != nil; e = e.Next() {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peer.Peer).ID.Equal(id) {
return e
}
......@@ -20,34 +31,42 @@ func (b *Bucket) Find(id peer.ID) *list.Element {
}
func (b *Bucket) MoveToFront(e *list.Element) {
bucket_list := (*list.List)(b)
bucket_list.MoveToFront(e)
b.lk.Lock()
b.list.MoveToFront(e)
b.lk.Unlock()
}
func (b *Bucket) PushFront(p *peer.Peer) {
bucket_list := (*list.List)(b)
bucket_list.PushFront(p)
b.lk.Lock()
b.list.PushFront(p)
b.lk.Unlock()
}
func (b *Bucket) PopBack() *peer.Peer {
bucket_list := (*list.List)(b)
last := bucket_list.Back()
bucket_list.Remove(last)
b.lk.Lock()
defer b.lk.Unlock()
last := b.list.Back()
b.list.Remove(last)
return last.Value.(*peer.Peer)
}
func (b *Bucket) Len() int {
bucket_list := (*list.List)(b)
return bucket_list.Len()
b.lk.RLock()
defer b.lk.RUnlock()
return b.list.Len()
}
// Splits a buckets peers into two buckets, the methods receiver will have
// peers with CPL equal to cpl, the returned bucket will have peers with CPL
// greater than cpl (returned bucket has closer peers)
func (b *Bucket) Split(cpl int, target ID) *Bucket {
bucket_list := (*list.List)(b)
b.lk.Lock()
defer b.lk.Unlock()
out := list.New()
e := bucket_list.Front()
newbuck := NewBucket()
newbuck.list = out
e := b.list.Front()
for e != nil {
peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID)
peer_cpl := prefLen(peer_id, target)
......@@ -55,15 +74,14 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
cur := e
out.PushBack(e.Value)
e = e.Next()
bucket_list.Remove(cur)
b.list.Remove(cur)
continue
}
e = e.Next()
}
return (*Bucket)(out)
return newbuck
}
func (b *Bucket) getIter() *list.Element {
bucket_list := (*list.List)(b)
return bucket_list.Front()
return b.list.Front()
}
......@@ -2,8 +2,10 @@ package dht
import (
"container/list"
"fmt"
"sort"
"sync"
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
......@@ -18,16 +20,20 @@ type RoutingTable struct {
// Blanket lock, refine later for better performance
tabLock sync.RWMutex
// Maximum acceptable latency for peers in this cluster
maxLatency time.Duration
// kBuckets define all the fingers to other nodes.
Buckets []*Bucket
bucketsize int
}
func NewRoutingTable(bucketsize int, local_id ID) *RoutingTable {
func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *RoutingTable {
rt := new(RoutingTable)
rt.Buckets = []*Bucket{new(Bucket)}
rt.Buckets = []*Bucket{NewBucket()}
rt.bucketsize = bucketsize
rt.local = local_id
rt.maxLatency = latency
return rt
}
......@@ -48,6 +54,10 @@ func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
e := bucket.Find(p.ID)
if e == nil {
// New peer, add to bucket
if p.GetLatency() > rt.maxLatency {
// Connection doesnt meet requirements, skip!
return nil
}
bucket.PushFront(p)
// Are we past the max bucket size?
......@@ -150,17 +160,16 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
// In the case of an unusual split, one bucket may be empty.
// if this happens, search both surrounding buckets for nearest peer
if cpl > 0 {
plist := (*list.List)(rt.Buckets[cpl-1])
plist := rt.Buckets[cpl-1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
if cpl < len(rt.Buckets)-1 {
plist := (*list.List)(rt.Buckets[cpl+1])
plist := rt.Buckets[cpl+1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
} else {
plist := (*list.List)(bucket)
peerArr = copyPeersFromList(id, peerArr, plist)
peerArr = copyPeersFromList(id, peerArr, bucket.list)
}
// Sort by distance to local peer
......@@ -193,3 +202,12 @@ func (rt *RoutingTable) Listpeers() []*peer.Peer {
}
return peers
}
func (rt *RoutingTable) Print() {
fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
rt.tabLock.RLock()
peers := rt.Listpeers()
for i, p := range peers {
fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String())
}
}
package dht
import (
"container/list"
crand "crypto/rand"
"crypto/sha256"
"math/rand"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
)
......@@ -27,7 +27,7 @@ func _randID() ID {
// Test basic features of the bucket struct
func TestBucket(t *testing.T) {
b := new(Bucket)
b := NewBucket()
peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ {
......@@ -45,7 +45,7 @@ func TestBucket(t *testing.T) {
}
spl := b.Split(0, ConvertPeerID(local.ID))
llist := (*list.List)(b)
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen()
......@@ -54,7 +54,7 @@ func TestBucket(t *testing.T) {
}
}
rlist := (*list.List)(spl)
rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen()
......@@ -67,7 +67,7 @@ func TestBucket(t *testing.T) {
// Right now, this just makes sure that it doesnt hang or crash
func TestTableUpdate(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID))
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ {
......@@ -93,7 +93,7 @@ func TestTableUpdate(t *testing.T) {
func TestTableFind(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID))
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 5; i++ {
......@@ -110,7 +110,7 @@ func TestTableFind(t *testing.T) {
func TestTableFindMultiple(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(20, ConvertPeerID(local.ID))
rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 18; i++ {
......@@ -124,3 +124,76 @@ func TestTableFindMultiple(t *testing.T) {
t.Fatalf("Got back different number of peers than we expected.")
}
}
// Looks for race conditions in table operations. For a more 'certain'
// test, increase the loop counter from 1000 to a much higher number
// and set GOMAXPROCS above 1
func TestTableMultithreaded(t *testing.T) {
local := peer.ID("localPeer")
tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour)
var peers []*peer.Peer
for i := 0; i < 500; i++ {
peers = append(peers, _randPeer())
}
done := make(chan struct{})
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Update(peers[n])
}
done <- struct{}{}
}()
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Update(peers[n])
}
done <- struct{}{}
}()
go func() {
for i := 0; i < 1000; i++ {
n := rand.Intn(len(peers))
tab.Find(peers[n].ID)
}
done <- struct{}{}
}()
<-done
<-done
<-done
}
func BenchmarkUpdates(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
for i := 0; i < b.N; i++ {
peers = append(peers, _randPeer())
}
b.StartTimer()
for i := 0; i < b.N; i++ {
tab.Update(peers[i])
}
}
func BenchmarkFinds(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
for i := 0; i < b.N; i++ {
peers = append(peers, _randPeer())
tab.Update(peers[i])
}
b.StartTimer()
for i := 0; i < b.N; i++ {
tab.Find(peers[i].ID)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment