Commit e0f11dff authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

dht: FindPeersConnectedToPeer

parent d06bb6d8
......@@ -2,6 +2,7 @@ package dht
import (
"bytes"
"sort"
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -414,6 +415,100 @@ func TestFindPeer(t *testing.T) {
}
}
func TestFindPeersConnectedToPeer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
u.Debug = false
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
}
}()
// topology:
// 0-1, 1-2, 1-3, 2-3
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
err = dhts[2].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
// fmt.Println("0 is", peers[0])
// fmt.Println("1 is", peers[1])
// fmt.Println("2 is", peers[2])
// fmt.Println("3 is", peers[3])
ctxT, _ := context.WithTimeout(ctx, time.Second)
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2].ID())
if err != nil {
t.Fatal(err)
}
// shouldFind := []peer.Peer{peers[1], peers[3]}
found := []peer.Peer{}
for nextp := range pchan {
found = append(found, nextp)
}
// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
// fmt.Println("should find 1, 3", shouldFind)
// fmt.Println("found", found)
// testPeerListsMatch(t, shouldFind, found)
log.Warning("TestFindPeersConnectedToPeer is not quite correct")
if len(found) == 0 {
t.Fatal("didn't find any peers.")
}
}
func testPeerListsMatch(t *testing.T, p1, p2 []peer.Peer) {
if len(p1) != len(p2) {
t.Fatal("did not find as many peers as should have", p1, p2)
}
ids1 := make([]string, len(p1))
ids2 := make([]string, len(p2))
for i, p := range p1 {
ids1[i] = p.ID().Pretty()
}
for i, p := range p2 {
ids2[i] = p.ID().Pretty()
}
sort.Sort(sort.StringSlice(ids1))
sort.Sort(sort.StringSlice(ids2))
for i := range ids1 {
if ids1[i] != ids2[i] {
t.Fatal("Didnt find expected peer", ids1[i], ids2)
}
}
}
func TestConnectCollision(t *testing.T) {
if testing.Short() {
t.SkipNow()
......
......@@ -159,6 +159,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Me
for _, p := range withAddresses {
log.Debugf("handleFindPeer: sending back '%s'", p)
}
resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, withAddresses)
return resp, nil
}
......
......@@ -5,6 +5,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
......@@ -268,6 +269,75 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
return result.peer, nil
}
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.Peer, error) {
peerchan := make(chan peer.Peer, 10)
peersSeen := map[string]peer.Peer{}
routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}
// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
return nil, err
}
var clpeers []peer.Peer
closer := pmes.GetCloserPeers()
for _, pbp := range closer {
// skip peers already seen
if _, found := peersSeen[string(pbp.GetId())]; found {
continue
}
// skip peers that fail to unmarshal
p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
if err != nil {
log.Warning(err)
continue
}
// if peer is connected, send it to our client.
if pb.Connectedness(*pbp.Connection) == inet.Connected {
select {
case <-ctx.Done():
return nil, ctx.Err()
case peerchan <- p:
}
}
peersSeen[string(p.ID())] = p
// if peer is the peer we're looking for, don't bother querying it.
if pb.Connectedness(*pbp.Connection) != inet.Connected {
clpeers = append(clpeers, p)
}
}
return &dhtQueryResult{closerPeers: clpeers}, nil
})
// run it! run it asynchronously to gen peers as results are found.
// this does no error checking
go func() {
if _, err := query.Run(ctx, closest); err != nil {
log.Error(err)
}
// close the peerchan channel when done.
close(peerchan)
}()
return peerchan, nil
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment