sessionmanager_test.go 6.44 KB
Newer Older
1 2 3 4 5 6 7
package sessionmanager

import (
	"context"
	"testing"
	"time"

8
	delay "github.com/ipfs/go-ipfs-delay"
9

10
	notifications "github.com/ipfs/go-bitswap/notifications"
11
	bssession "github.com/ipfs/go-bitswap/session"
12
	bssd "github.com/ipfs/go-bitswap/sessiondata"
13
	"github.com/ipfs/go-bitswap/testutil"
14 15 16

	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
17
	peer "github.com/libp2p/go-libp2p-core/peer"
18 19 20
)

type fakeSession struct {
21 22 23 24 25 26
	wanted []cid.Cid
	ks     []cid.Cid
	id     uint64
	pm     *fakePeerManager
	srs    *fakeRequestSplitter
	notif  notifications.PubSub
27 28 29 30 31 32 33 34
}

func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
	return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
	return nil, nil
}
35 36
func (fs *fakeSession) IsWanted(c cid.Cid) bool {
	for _, ic := range fs.wanted {
37 38 39 40 41 42
		if c == ic {
			return true
		}
	}
	return false
}
43
func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid) {
44
	fs.ks = append(fs.ks, ks...)
45
}
46 47 48 49 50 51

type fakePeerManager struct {
	id uint64
}

func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid)  {}
52
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
53
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
54 55
func (*fakePeerManager) RecordPeerResponse(peer.ID, []cid.Cid)   {}
func (*fakePeerManager) RecordCancels(c []cid.Cid)               {}
56

57 58 59
type fakeRequestSplitter struct {
}

60
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
61 62 63 64 65
	return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock()    {}

66
var nextWanted []cid.Cid
67

68 69 70 71
func sessionFactory(ctx context.Context,
	id uint64,
	pm bssession.PeerManager,
	srs bssession.RequestSplitter,
72
	notif notifications.PubSub,
73 74
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) Session {
75
	return &fakeSession{
76 77 78 79 80
		wanted: nextWanted,
		id:     id,
		pm:     pm.(*fakePeerManager),
		srs:    srs.(*fakeRequestSplitter),
		notif:  notif,
81 82 83 84 85 86 87
	}
}

func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
	return &fakePeerManager{id}
}

88 89 90 91
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
	return &fakeRequestSplitter{}
}

92 93 94 95
func TestAddingSessions(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
96 97 98
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
99 100 101 102

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
103
	nextWanted = []cid.Cid{block.Cid()}
104 105

	currentID := sm.GetNextSessionID()
106
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
107 108 109 110
	if firstSession.id != firstSession.pm.id ||
		firstSession.id != currentID+1 {
		t.Fatal("session does not have correct id set")
	}
111
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
112 113 114 115 116
	if secondSession.id != secondSession.pm.id ||
		secondSession.id != firstSession.id+1 {
		t.Fatal("session does not have correct id set")
	}
	sm.GetNextSessionID()
117
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
118 119 120 121
	if thirdSession.id != thirdSession.pm.id ||
		thirdSession.id != secondSession.id+2 {
		t.Fatal("session does not have correct id set")
	}
122
	sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
123 124 125
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) == 0 ||
		len(thirdSession.ks) == 0 {
126 127 128 129
		t.Fatal("should have received blocks but didn't")
	}
}

130
func TestIsWanted(t *testing.T) {
131 132 133
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
134 135 136
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
137 138 139 140 141 142 143

	blks := testutil.GenerateBlocksOfSize(4, 1024)
	var cids []cid.Cid
	for _, b := range blks {
		cids = append(cids, b.Cid())
	}

144
	nextWanted = []cid.Cid{cids[0], cids[1]}
145
	_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
146
	nextWanted = []cid.Cid{cids[0], cids[2]}
147 148
	_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)

149 150 151
	if !sm.IsWanted(cids[0]) ||
		!sm.IsWanted(cids[1]) ||
		!sm.IsWanted(cids[2]) {
152
		t.Fatal("expected unwanted but session manager did want cid")
153
	}
154
	if sm.IsWanted(cids[3]) {
155
		t.Fatal("expected wanted but session manager did not want cid")
156 157 158
	}
}

159 160 161
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
162 163 164
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
165 166 167 168

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
169
	nextWanted = []cid.Cid{block.Cid()}
170 171 172
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
173 174 175 176

	cancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
177
	sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
178 179 180
	if len(firstSession.ks) > 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) > 0 {
181 182 183 184 185 186 187 188
		t.Fatal("received blocks for sessions after manager is shutdown")
	}
}

func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
189 190 191
	notif := notifications.New()
	defer notif.Shutdown()
	sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
192 193 194 195

	p := peer.ID(123)
	block := blocks.NewBlock([]byte("block"))
	// we'll be interested in all blocks for this test
196
	nextWanted = []cid.Cid{block.Cid()}
197
	firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
198
	sessionCtx, sessionCancel := context.WithCancel(ctx)
199 200
	secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
	thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
201 202 203 204

	sessionCancel()
	// wait for sessions to get removed
	time.Sleep(10 * time.Millisecond)
205
	sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
206 207 208
	if len(firstSession.ks) == 0 ||
		len(secondSession.ks) > 0 ||
		len(thirdSession.ks) == 0 {
209 210 211
		t.Fatal("received blocks for sessions that are canceled")
	}
}