Unverified Commit fa9aec89 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #56 from ipfs/feat/connect-providers-in-sessions

fix(sessions): explicitly connect found peers
parents eccfedf7 6f7a77e0
......@@ -152,6 +152,47 @@ func TestSessionSplitFetch(t *testing.T) {
}
}
func TestFetchNotConnected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
bssession.SetProviderSearchDelay(10 * time.Millisecond)
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
other := sesgen.Next()
blks := bgen.Blocks(10)
for _, block := range blks {
if err := other.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}
}
var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}
thisNode := sesgen.Next()
ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)
ch, err := ses.GetBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks); err != nil {
t.Fatal(err)
}
}
func TestInterestCacheOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......
......@@ -222,7 +222,12 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}
const provSearchDelay = time.Second * 10
var provSearchDelay = time.Second
// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
provSearchDelay = newProvSearchDelay
}
// Session run loop -- everything function below here should not be called
// of this loop
......
......@@ -5,11 +5,15 @@ import (
"fmt"
"math/rand"
logging "github.com/ipfs/go-log"
cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("bitswap")
const (
maxOptimizedPeers = 32
reservePeers = 2
......@@ -18,6 +22,7 @@ const (
// PeerNetwork is an interface for finding providers and managing connections
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
ConnectTo(context.Context, peer.ID) error
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}
......@@ -102,7 +107,13 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.peerMessages <- &peerFoundMessage{p}
go func(p peer.ID) {
err := spm.network.ConnectTo(ctx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
}
spm.peerMessages <- &peerFoundMessage{p}
}(p)
}
}(c)
}
......
......@@ -2,8 +2,8 @@ package sessionpeermanager
import (
"context"
"sync"
"math/rand"
"sync"
"testing"
"time"
......@@ -24,6 +24,10 @@ func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager {
return fpn.connManager
}
func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error {
return nil
}
func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID {
peerCh := make(chan peer.ID)
go func() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment