sessionmanager_test.go 6.26 KB
Newer Older
1 2 3 4 5 6 7
package sessionmanager

import (
	"context"
	"testing"
	"time"

8
	delay "github.com/ipfs/go-ipfs-delay"
9

10
	bssession "github.com/ipfs/go-bitswap/session"
11
	bssd "github.com/ipfs/go-bitswap/sessiondata"
12 13 14

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
16 17 18
)

type fakeSession struct {
19 20 21 22 23
	interested            bool
	receivedBlock         bool
	updateReceiveCounters bool
	id                    uint64
	pm                    *fakePeerManager
24
	srs                   *fakeRequestSplitter
25 26 27 28 29 30 31 32
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
33 34 35
func (fs *fakeSession) InterestedIn(cid.Cid) bool                   { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block)      { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(peer.ID, blocks.Block) { fs.updateReceiveCounters = true }
36 37 38 39 40 41

type fakePeerManager struct {
	id uint64
}

func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid)  {}
42
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
43 44
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid)     {}
45
func (*fakePeerManager) RecordCancel(c cid.Cid)                  {}
46

47 48 49
type fakeRequestSplitter struct {
}

50
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
51 52 53 54 55
	return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

56 57
var nextInterestedIn bool

58 59 60 61 62 63
func sessionFactory(ctx context.Context,
	id uint64,
	pm bssession.PeerManager,
	srs bssession.RequestSplitter,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) Session {
64 65 66 67 68
	return &fakeSession{
		interested:    nextInterestedIn,
		receivedBlock: false,
		id:            id,
		pm:            pm.(*fakePeerManager),
69
		srs:           srs.(*fakeRequestSplitter),
70 71 72 73 74 75 76
	}
}

func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
	return &fakePeerManager{id}
}

77 78 79 80
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
	return &fakeRequestSplitter{}
}

81 82 83 84
func TestAddingSessions(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
85
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
86 87 88 89 90 91 92

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = true

	currentID := sm.GetNextSessionID()
93
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
94 95 96 97
	if firstSession.id != firstSession.pm.id ||
		firstSession.id != currentID+1 {
		t.Fatal("session does not have correct id set")
	}
98
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
99 100 101 102 103
	if secondSession.id != secondSession.pm.id ||
		secondSession.id != firstSession.id+1 {
		t.Fatal("session does not have correct id set")
	}
	sm.GetNextSessionID()
104
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
	if thirdSession.id != thirdSession.pm.id ||
		thirdSession.id != secondSession.id+2 {
		t.Fatal("session does not have correct id set")
	}
	sm.ReceiveBlockFrom(p, block)
	if !firstSession.receivedBlock ||
		!secondSession.receivedBlock ||
		!thirdSession.receivedBlock {
		t.Fatal("should have received blocks but didn't")
	}
}

func TestReceivingBlocksWhenNotInterested(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
121
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
122 123 124 125 126

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = false
127
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
128
	nextInterestedIn = true
129
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
130
	nextInterestedIn = false
131
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
132 133 134 135 136 137 138 139 140 141 142 143

	sm.ReceiveBlockFrom(p, block)
	if firstSession.receivedBlock ||
		!secondSession.receivedBlock ||
		thirdSession.receivedBlock {
		t.Fatal("did not receive blocks only for interested sessions")
	}
}

func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
144
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
145 146 147 148 149

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = true
150 151 152
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

	cancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
	sm.ReceiveBlockFrom(p, block)
	if firstSession.receivedBlock ||
		secondSession.receivedBlock ||
		thirdSession.receivedBlock {
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
169
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
170 171 172 173 174

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
	nextInterestedIn = true
175
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
176
	sessionCtx, sessionCancel := context.WithCancel(ctx)
177 178
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
179 180 181 182 183 184 185 186 187 188 189

	sessionCancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
	sm.ReceiveBlockFrom(p, block)
	if !firstSession.receivedBlock ||
		secondSession.receivedBlock ||
		!thirdSession.receivedBlock {
		t.Fatal("received blocks for sessions that are canceled")
	}
}