sessionmanager_test.go 6.18 KB
Newer Older
1 2 3 4 5 6 7
package sessionmanager

import (
	"context"
	"testing"
	"time"

8
	delay "github.com/ipfs/go-ipfs-delay"
9

10
	bssession "github.com/ipfs/go-bitswap/session"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
12 13 14

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
16 17 18
)

type fakeSession struct {
19 20 21 22 23
	interested            bool
	receivedBlock         bool
	updateReceiveCounters bool
	id                    uint64
	pm                    *fakePeerManager
24
	srs                   *fakeRequestSplitter
25 26 27 28 29 30 31 32 33 34
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
func (fs *fakeSession) InterestedIn(cid.Cid) bool              { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
35
func (fs *fakeSession) UpdateReceiveCounters(blocks.Block)     { fs.updateReceiveCounters = true }
36 37 38 39 40 41

type fakePeerManager struct {
	id uint64
}

func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid)  {}
42
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
43 44 45
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid)     {}

46 47 48
type fakeRequestSplitter struct {
}

49
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
50 51 52 53 54
	return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

55 56
var nextInterestedIn bool

57 58 59 60 61 62
func sessionFactory(ctx context.Context,
	id uint64,
	pm bssession.PeerManager,
	srs bssession.RequestSplitter,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) Session {
63 64 65 66 67
	return &fakeSession{
		interested:    nextInterestedIn,
		receivedBlock: false,
		id:            id,
		pm:            pm.(*fakePeerManager),
68
		srs:           srs.(*fakeRequestSplitter),
69 70 71 72 73 74 75
	}
}

func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
	return &fakePeerManager{id}
}

76 77 78 79
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
	return &fakeRequestSplitter{}
}

80 81 82 83
func TestAddingSessions(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
84
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
85 86 87 88 89 90 91

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = true

	currentID := sm.GetNextSessionID()
92
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
93 94 95 96
	if firstSession.id != firstSession.pm.id ||
		firstSession.id != currentID+1 {
		t.Fatal("session does not have correct id set")
	}
97
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
98 99 100 101 102
	if secondSession.id != secondSession.pm.id ||
		secondSession.id != firstSession.id+1 {
		t.Fatal("session does not have correct id set")
	}
	sm.GetNextSessionID()
103
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
	if thirdSession.id != thirdSession.pm.id ||
		thirdSession.id != secondSession.id+2 {
		t.Fatal("session does not have correct id set")
	}
	sm.ReceiveBlockFrom(p, block)
	if !firstSession.receivedBlock ||
		!secondSession.receivedBlock ||
		!thirdSession.receivedBlock {
		t.Fatal("should have received blocks but didn't")
	}
}

func TestReceivingBlocksWhenNotInterested(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
120
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
121 122 123 124 125

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = false
126
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
127
	nextInterestedIn = true
128
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
129
	nextInterestedIn = false
130
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
131 132 133 134 135 136 137 138 139 140 141 142

	sm.ReceiveBlockFrom(p, block)
	if firstSession.receivedBlock ||
		!secondSession.receivedBlock ||
		thirdSession.receivedBlock {
		t.Fatal("did not receive blocks only for interested sessions")
	}
}

func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
143
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
144 145 146 147 148

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = true
149 150 151
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167

	cancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
	sm.ReceiveBlockFrom(p, block)
	if firstSession.receivedBlock ||
		secondSession.receivedBlock ||
		thirdSession.receivedBlock {
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
168
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
169 170 171 172 173

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = true
174
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
175
	sessionCtx, sessionCancel := context.WithCancel(ctx)
176 177
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
178 179 180 181 182 183 184 185 186 187 188

	sessionCancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
	sm.ReceiveBlockFrom(p, block)
	if !firstSession.receivedBlock ||
		secondSession.receivedBlock ||
		!thirdSession.receivedBlock {
		t.Fatal("received blocks for sessions that are canceled")
	}
}