Commit 04e9ae33 authored by Jeromy's avatar Jeromy

rewrite of provides to better select peers to send RPCs to

refactor test peer creation to be deterministic and reliable

a bit of cleanup trying to figure out TestGetFailure

add test to verify deterministic peer creation

switch put RPC over to use getClosestPeers

rm 0xDEADC0DE

fix queries not searching peer if its not actually closer
parent 9b3a1679
...@@ -103,9 +103,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { ...@@ -103,9 +103,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
} }
// putValueToNetwork stores the given key/value pair at the peer 'p' // putValueToNetwork stores the given key/value pair at the peer 'p'
// meaning: it sends a PUT_VALUE message to p
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID, func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID,
key string, rec *pb.Record) error { key u.Key, rec *pb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Record = rec pmes.Record = rec
...@@ -285,7 +284,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { ...@@ -285,7 +284,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
} }
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count) closer := dht.nearestPeersToQuery(pmes, count)
// no node? nil // no node? nil
...@@ -302,11 +301,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID { ...@@ -302,11 +301,16 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
} }
var filtered []peer.ID var filtered []peer.ID
for _, p := range closer { for _, clp := range closer {
// Dont send a peer back themselves
if p == clp {
continue
}
// must all be closer than self // must all be closer than self
key := u.Key(pmes.GetKey()) key := u.Key(pmes.GetKey())
if !kb.Closer(dht.self, p, key) { if !kb.Closer(dht.self, clp, key) {
filtered = append(filtered, p) filtered = append(filtered, clp)
} }
} }
...@@ -323,23 +327,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error ...@@ -323,23 +327,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
return dht.network.DialPeer(ctx, p) return dht.network.DialPeer(ctx, p)
} }
//TODO: this should be smarter about which keys it selects.
func (dht *IpfsDHT) loadProvidableKeys() error {
kl, err := dht.datastore.KeyList()
if err != nil {
return err
}
for _, dsk := range kl {
k := u.KeyFromDsKey(dsk)
if len(k) == 0 {
log.Errorf("loadProvidableKeys error: %v", dsk)
}
dht.providers.AddProvider(k, dht.self)
}
return nil
}
// PingRoutine periodically pings nearest neighbors. // PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) { func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done() defer dht.Children().Done()
......
...@@ -14,7 +14,6 @@ import ( ...@@ -14,7 +14,6 @@ import (
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
// ci "github.com/jbenet/go-ipfs/crypto"
inet "github.com/jbenet/go-ipfs/net" inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
...@@ -33,9 +32,9 @@ func init() { ...@@ -33,9 +32,9 @@ func init() {
} }
} }
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr, seed int64) *IpfsDHT {
sk, pk, err := testutil.RandKeyPair(512) sk, pk, err := testutil.SeededKeyPair(512, seed)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -71,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer ...@@ -71,7 +70,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
addrs[i] = testutil.RandLocalTCPAddress() addrs[i] = testutil.RandLocalTCPAddress()
dhts[i] = setupDHT(ctx, t, addrs[i]) dhts[i] = setupDHT(ctx, t, addrs[i], int64(i))
peers[i] = dhts[i].self peers[i] = dhts[i].self
} }
...@@ -120,8 +119,8 @@ func TestPing(t *testing.T) { ...@@ -120,8 +119,8 @@ func TestPing(t *testing.T) {
addrA := testutil.RandLocalTCPAddress() addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress()
dhtA := setupDHT(ctx, t, addrA) dhtA := setupDHT(ctx, t, addrA, 1)
dhtB := setupDHT(ctx, t, addrB) dhtB := setupDHT(ctx, t, addrB, 2)
peerA := dhtA.self peerA := dhtA.self
peerB := dhtB.self peerB := dhtB.self
...@@ -153,8 +152,8 @@ func TestValueGetSet(t *testing.T) { ...@@ -153,8 +152,8 @@ func TestValueGetSet(t *testing.T) {
addrA := testutil.RandLocalTCPAddress() addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress()
dhtA := setupDHT(ctx, t, addrA) dhtA := setupDHT(ctx, t, addrA, 1)
dhtB := setupDHT(ctx, t, addrB) dhtB := setupDHT(ctx, t, addrB, 2)
defer dhtA.Close() defer dhtA.Close()
defer dhtB.Close() defer dhtB.Close()
...@@ -642,8 +641,8 @@ func TestConnectCollision(t *testing.T) { ...@@ -642,8 +641,8 @@ func TestConnectCollision(t *testing.T) {
addrA := testutil.RandLocalTCPAddress() addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress() addrB := testutil.RandLocalTCPAddress()
dhtA := setupDHT(ctx, t, addrA) dhtA := setupDHT(ctx, t, addrA, int64((rtime*2)+1))
dhtB := setupDHT(ctx, t, addrB) dhtB := setupDHT(ctx, t, addrB, int64((rtime*2)+2))
peerA := dhtA.self peerA := dhtA.self
peerB := dhtB.self peerB := dhtB.self
......
...@@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) { ...@@ -47,9 +47,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("Did not get expected error!") t.Fatal("Did not get expected error!")
} }
msgs := make(chan *pb.Message, 100) t.Log("Timeout test passed.")
// u.POut("NotFound Test\n")
// Reply with failures to every message // Reply with failures to every message
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close() defer s.Close()
...@@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) { ...@@ -68,8 +67,6 @@ func TestGetFailures(t *testing.T) {
if err := pbw.WriteMsg(resp); err != nil { if err := pbw.WriteMsg(resp); err != nil {
panic(err) panic(err)
} }
msgs <- resp
}) })
// This one should fail with NotFound // This one should fail with NotFound
...@@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) { ...@@ -83,6 +80,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("expected error, got none.") t.Fatal("expected error, got none.")
} }
t.Log("ErrNotFound check passed!")
// Now we test this DHT's handleGetValue failure // Now we test this DHT's handleGetValue failure
{ {
typ := pb.Message_GET_VALUE typ := pb.Message_GET_VALUE
......
...@@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
} }
// Find closest peer on given cluster to desired key and reply with that info // Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, CloserPeerCount) closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
closerinfos := peer.PeerInfos(dht.peerstore, closer) closerinfos := peer.PeerInfos(dht.peerstore, closer)
if closer != nil { if closer != nil {
for _, pi := range closerinfos { for _, pi := range closerinfos {
...@@ -137,6 +137,9 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( ...@@ -137,6 +137,9 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (
} }
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Errorf("handle find peer %s start", p)
defer log.Errorf("handle find peer %s end", p)
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []peer.ID var closest []peer.ID
...@@ -144,11 +147,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess ...@@ -144,11 +147,11 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
if peer.ID(pmes.GetKey()) == dht.self { if peer.ID(pmes.GetKey()) == dht.self {
closest = []peer.ID{dht.self} closest = []peer.ID{dht.self}
} else { } else {
closest = dht.betterPeersToQuery(pmes, CloserPeerCount) closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
} }
if closest == nil { if closest == nil {
log.Debugf("handleFindPeer: could not find anything.") log.Warningf("handleFindPeer: could not find anything.")
return resp, nil return resp, nil
} }
...@@ -189,7 +192,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. ...@@ -189,7 +192,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
} }
// Also send closer peers. // Also send closer peers.
closer := dht.betterPeersToQuery(pmes, CloserPeerCount) closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
if closer != nil { if closer != nil {
infos := peer.PeerInfos(dht.peerstore, providers) infos := peer.PeerInfos(dht.peerstore, providers)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos)
......
...@@ -7,8 +7,8 @@ import ( ...@@ -7,8 +7,8 @@ import (
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue" queue "github.com/jbenet/go-ipfs/peer/queue"
"github.com/jbenet/go-ipfs/routing" "github.com/jbenet/go-ipfs/routing"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
pset "github.com/jbenet/go-ipfs/util/peerset"
todoctr "github.com/jbenet/go-ipfs/util/todocounter" todoctr "github.com/jbenet/go-ipfs/util/todocounter"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -71,7 +71,7 @@ type dhtQueryRunner struct { ...@@ -71,7 +71,7 @@ type dhtQueryRunner struct {
peersToQuery *queue.ChanQueue peersToQuery *queue.ChanQueue
// peersSeen are all the peers queried. used to prevent querying same peer 2x // peersSeen are all the peers queried. used to prevent querying same peer 2x
peersSeen peer.Set peersSeen *pset.PeerSet
// rateLimit is a channel used to rate limit our processing (semaphore) // rateLimit is a channel used to rate limit our processing (semaphore)
rateLimit chan struct{} rateLimit chan struct{}
...@@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { ...@@ -97,7 +97,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
query: q, query: q,
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)), peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
peersRemaining: todoctr.NewSyncCounter(), peersRemaining: todoctr.NewSyncCounter(),
peersSeen: peer.Set{}, peersSeen: pset.New(),
rateLimit: make(chan struct{}, q.concurrency), rateLimit: make(chan struct{}, q.concurrency),
cg: ctxgroup.WithContext(ctx), cg: ctxgroup.WithContext(ctx),
} }
...@@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { ...@@ -117,7 +117,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
// add all the peers we got first. // add all the peers we got first.
for _, p := range peers { for _, p := range peers {
r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here... r.addPeerToQuery(r.cg.Context(), p)
} }
// go do this thing. // go do this thing.
...@@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { ...@@ -153,32 +153,17 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
return nil, err return nil, err
} }
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) { func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
// if new peer is ourselves... // if new peer is ourselves...
if next == r.query.dialer.LocalPeer() { if next == r.query.dialer.LocalPeer() {
return return
} }
// if new peer further away than whom we got it from, don't bother (loops) if !r.peersSeen.TryAdd(next) {
// TODO----------- this benchmark should be replaced by a heap: log.Debug("query peer was already seen")
// we should be doing the s/kademlia "continue to search"
// (i.e. put all of them in a heap sorted by dht distance and then just
// pull from the the top until a) you exhaust all peers you get,
// b) you succeed, c) your context expires.
if benchmark != "" && kb.Closer(benchmark, next, r.query.key) {
return return
} }
// if already seen, no need.
r.Lock()
_, found := r.peersSeen[next]
if found {
r.Unlock()
return
}
r.peersSeen[next] = struct{}{}
r.Unlock()
log.Debugf("adding peer to query: %v", next) log.Debugf("adding peer to query: %v", next)
// do this after unlocking to prevent possible deadlocks. // do this after unlocking to prevent possible deadlocks.
...@@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { ...@@ -278,7 +263,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
} }
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs) r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
r.addPeerToQuery(cg.Context(), next.ID, p) r.addPeerToQuery(cg.Context(), next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
} }
} else { } else {
......
...@@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error ...@@ -40,19 +40,24 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return err return err
} }
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) pchan, err := dht.getClosestPeers(ctx, key, KValue)
if err != nil {
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { return err
log.Debugf("%s PutValue qry part %v", dht.self, p) }
err := dht.putValueToNetwork(ctx, p, string(key), rec)
if err != nil {
return nil, err
}
return &dhtQueryResult{success: true}, nil
})
_, err = query.Run(ctx, peers) wg := sync.WaitGroup{}
return err for p := range pchan {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := dht.putValueToNetwork(ctx, p, key, rec)
if err != nil {
log.Errorf("failed putting value to peer: %s", err)
}
}(p)
}
wg.Wait()
return nil
} }
// GetValue searches for the value corresponding to given Key. // GetValue searches for the value corresponding to given Key.
...@@ -111,18 +116,19 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { ...@@ -111,18 +116,19 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// Provide makes this node announce that it can provide a value for the given key // Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
log.Event(ctx, "Provide Value start", &key)
defer log.Event(ctx, "Provide Value end", &key)
dht.providers.AddProvider(key, dht.self) dht.providers.AddProvider(key, dht.self)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 { peers, err := dht.getClosestPeers(ctx, key, KValue)
return nil if err != nil {
return err
} }
//TODO FIX: this doesn't work! it needs to be sent to the actual nearest peers. for p := range peers {
// `peers` are the closest peers we have, not the ones that should get the value.
for _, p := range peers {
err := dht.putProvider(ctx, p, string(key)) err := dht.putProvider(ctx, p, string(key))
if err != nil { if err != nil {
return err log.Error(err)
} }
} }
return nil return nil
...@@ -137,6 +143,87 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn ...@@ -137,6 +143,87 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
return providers, nil return providers, nil
} }
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key, count int) (<-chan peer.ID, error) {
log.Error("Get Closest Peers")
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
if len(tablepeers) == 0 {
return nil, kb.ErrLookupFailure
}
out := make(chan peer.ID, count)
peerset := pset.NewLimited(count)
for _, p := range tablepeers {
out <- p
peerset.Add(p)
}
wg := sync.WaitGroup{}
for _, p := range tablepeers {
wg.Add(1)
go func(p peer.ID) {
dht.getClosestPeersRecurse(ctx, key, p, peerset, out)
wg.Done()
}(p)
}
go func() {
wg.Wait()
close(out)
log.Error("Closing closest peer chan")
}()
return out, nil
}
func (dht *IpfsDHT) getClosestPeersRecurse(ctx context.Context, key u.Key, p peer.ID, peers *pset.PeerSet, peerOut chan<- peer.ID) {
log.Error("closest peers recurse")
defer log.Error("closest peers recurse end")
closer, err := dht.closerPeersSingle(ctx, key, p)
if err != nil {
log.Errorf("error getting closer peers: %s", err)
return
}
wg := sync.WaitGroup{}
for _, p := range closer {
if kb.Closer(p, dht.self, key) && peers.TryAdd(p) {
select {
case peerOut <- p:
case <-ctx.Done():
return
}
wg.Add(1)
go func(p peer.ID) {
dht.getClosestPeersRecurse(ctx, key, p, peers, peerOut)
wg.Done()
}(p)
}
}
wg.Wait()
}
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) ([]peer.ID, error) {
log.Errorf("closest peers single %s %s", p, key)
defer log.Errorf("closest peers single end %s %s", p, key)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
return nil, err
}
var out []peer.ID
for _, pbp := range pmes.GetCloserPeers() {
pid := peer.ID(pbp.GetId())
dht.peerstore.AddAddresses(pid, pbp.Addresses())
err := dht.ensureConnectedToPeer(ctx, pid)
if err != nil {
return nil, err
}
out = append(out, pid)
}
return out, nil
}
// FindProvidersAsync is the same thing as FindProviders, but returns a channel. // FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before // Peers will be returned on the channel as soon as they are found, even before
// the search query completes. // the search query completes.
...@@ -182,6 +269,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co ...@@ -182,6 +269,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
// Add unique providers from request, up to 'count' // Add unique providers from request, up to 'count'
for _, prov := range provs { for _, prov := range provs {
if ps.TryAdd(prov.ID) { if ps.TryAdd(prov.ID) {
dht.peerstore.AddAddresses(prov.ID, prov.Addrs)
select { select {
case peerOut <- prov: case peerOut <- prov:
case <-ctx.Done(): case <-ctx.Done():
......
package kbucket
import (
"container/list"
peer "github.com/jbenet/go-ipfs/peer"
"sort"
)
// A helper struct to sort peers by their distance to the local node
type peerDistance struct {
p peer.ID
distance ID
}
// peerSorterArr implements sort.Interface to sort peers by xor distance
type peerSorterArr []*peerDistance
func (p peerSorterArr) Len() int { return len(p) }
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
func (p peerSorterArr) Less(a, b int) bool {
return p[a].distance.less(p[b].distance)
}
//
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(peer.ID)
pID := ConvertPeerID(p)
pd := peerDistance{
p: p,
distance: xor(target, pID),
}
peerArr = append(peerArr, &pd)
if e == nil {
log.Debug("list element was nil")
return peerArr
}
}
return peerArr
}
func SortClosestPeers(peers []peer.ID, target ID) []peer.ID {
var psarr peerSorterArr
for _, p := range peers {
pID := ConvertPeerID(p)
pd := &peerDistance{
p: p,
distance: xor(target, pID),
}
psarr = append(psarr, pd)
}
sort.Sort(psarr)
var out []peer.ID
for _, p := range psarr {
out = append(out, p.p)
}
return out
}
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
package kbucket package kbucket
import ( import (
"container/list"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
...@@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID { ...@@ -103,40 +102,6 @@ func (rt *RoutingTable) nextBucket() peer.ID {
return "" return ""
} }
// A helper struct to sort peers by their distance to the local node
type peerDistance struct {
p peer.ID
distance ID
}
// peerSorterArr implements sort.Interface to sort peers by xor distance
type peerSorterArr []*peerDistance
func (p peerSorterArr) Len() int { return len(p) }
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
func (p peerSorterArr) Less(a, b int) bool {
return p[a].distance.less(p[b].distance)
}
//
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(peer.ID)
pID := ConvertPeerID(p)
pd := peerDistance{
p: p,
distance: xor(target, pID),
}
peerArr = append(peerArr, &pd)
if e == nil {
log.Debug("list element was nil")
return peerArr
}
}
return peerArr
}
// Find a specific peer by ID or return nil // Find a specific peer by ID or return nil
func (rt *RoutingTable) Find(id peer.ID) peer.ID { func (rt *RoutingTable) Find(id peer.ID) peer.ID {
srch := rt.NearestPeers(ConvertPeerID(id), 1) srch := rt.NearestPeers(ConvertPeerID(id), 1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment