engine_test.go 10.4 KB
Newer Older
1
package decision
2 3

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
5 6
	"errors"
	"fmt"
7
	"strings"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8
	"sync"
9
	"testing"
10
	"time"
11

Jeromy's avatar
Jeromy committed
12 13 14 15 16 17
	message "github.com/ipfs/go-bitswap/message"

	blocks "github.com/ipfs/go-block-format"
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
19
	testutil "github.com/libp2p/go-libp2p-core/test"
20 21
)

22 23 24 25 26
type peerTag struct {
	done  chan struct{}
	peers map[peer.ID]int
}

27
type fakePeerTagger struct {
28 29
	lk   sync.Mutex
	tags map[string]*peerTag
30 31
}

32 33 34
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
35 36 37 38 39 40 41 42 43
	if fpt.tags == nil {
		fpt.tags = make(map[string]*peerTag, 1)
	}
	pt, ok := fpt.tags[tag]
	if !ok {
		pt = &peerTag{peers: make(map[peer.ID]int, 1), done: make(chan struct{})}
		fpt.tags[tag] = pt
	}
	pt.peers[p] = n
44 45 46 47 48
}

func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
49 50 51 52 53 54 55 56
	pt := fpt.tags[tag]
	if pt == nil {
		return
	}
	delete(pt.peers, p)
	if len(pt.peers) == 0 {
		close(pt.done)
		delete(fpt.tags, tag)
57 58 59
	}
}

60
func (fpt *fakePeerTagger) count(tag string) int {
61 62
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
	if pt, ok := fpt.tags[tag]; ok {
		return len(pt.peers)
	}
	return 0
}

func (fpt *fakePeerTagger) wait(tag string) {
	fpt.lk.Lock()
	pt := fpt.tags[tag]
	if pt == nil {
		fpt.lk.Unlock()
		return
	}
	doneCh := pt.done
	fpt.lk.Unlock()
	<-doneCh
79 80 81 82 83 84 85 86 87 88 89 90 91
}

type engineSet struct {
	PeerTagger *fakePeerTagger
	Peer       peer.ID
	Engine     *Engine
	Blockstore blockstore.Blockstore
}

func newEngine(ctx context.Context, idStr string) engineSet {
	fpt := &fakePeerTagger{}
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	return engineSet{
92
		Peer: peer.ID(idStr),
93
		//Strategy: New(true),
94 95
		PeerTagger: fpt,
		Blockstore: bs,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96
		Engine: NewEngine(ctx,
97
			bs, fpt),
98 99 100
	}
}

101
func TestConsistentAccounting(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103 104 105
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sender := newEngine(ctx, "Ernie")
	receiver := newEngine(ctx, "Bert")
106 107 108 109

	// Send messages from Ernie to Bert
	for i := 0; i < 1000; i++ {

110
		m := message.New(false)
111
		content := []string{"this", "is", "message", "i"}
Jeromy's avatar
Jeromy committed
112
		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
113

114 115
		sender.Engine.MessageSent(receiver.Peer, m)
		receiver.Engine.MessageReceived(sender.Peer, m)
116 117 118
	}

	// Ensure sender records the change
Brian Tiger Chow's avatar
Brian Tiger Chow committed
119
	if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
120 121 122 123
		t.Fatal("Sent bytes were not recorded")
	}

	// Ensure sender and receiver have the same values
Brian Tiger Chow's avatar
Brian Tiger Chow committed
124
	if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
125 126 127 128 129
		t.Fatal("Inconsistent book-keeping. Strategies don't agree")
	}

	// Ensure sender didn't record receving anything. And that the receiver
	// didn't record sending anything
Brian Tiger Chow's avatar
Brian Tiger Chow committed
130
	if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
131 132 133 134
		t.Fatal("Bert didn't send bytes to Ernie")
	}
}

135 136
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
137 138 139 140
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")
141

142
	m := message.New(true)
143

144 145
	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
146

147
	if seattle.Peer == sanfrancisco.Peer {
148 149 150
		t.Fatal("Sanity Check: Peers have same Key!")
	}

151
	if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
152 153 154
		t.Fatal("Peer wasn't added as a Partner")
	}

155
	if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
156 157
		t.Fatal("Peer wasn't added as a Partner")
	}
Jeromy's avatar
Jeromy committed
158 159 160 161 162

	seattle.Engine.PeerDisconnected(sanfrancisco.Peer)
	if peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
		t.Fatal("expected peer to be removed")
	}
163 164
}

165
func peerIsPartner(p peer.ID, e *Engine) bool {
166
	for _, partner := range e.Peers() {
167
		if partner == p {
168 169 170 171 172
			return true
		}
	}
	return false
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
173 174 175

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
	t.SkipNow() // TODO implement *Engine.Close
176
	e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{})
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177 178 179
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
180 181
		for nextEnvelope := range e.Outbox() {
			<-nextEnvelope
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182 183 184 185 186 187 188 189 190 191 192
		}
		wg.Done()
	}()
	// e.Close()
	wg.Wait()
	if _, ok := <-e.Outbox(); ok {
		t.Fatal("channel should be closed")
	}
}

func TestPartnerWantsThenCancels(t *testing.T) {
193 194 195 196
	numRounds := 10
	if testing.Short() {
		numRounds = 1
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
197 198 199 200 201
	alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
	vowels := strings.Split("aeiou", "")

	type testCase [][]string
	testcases := []testCase{
rht's avatar
rht committed
202
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
203 204
			alphabet, vowels,
		},
rht's avatar
rht committed
205
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
206
			alphabet, stringsComplement(alphabet, vowels),
207 208 209 210 211 212 213 214 215 216 217 218 219
			alphabet[1:25], stringsComplement(alphabet[1:25], vowels), alphabet[2:25], stringsComplement(alphabet[2:25], vowels),
			alphabet[3:25], stringsComplement(alphabet[3:25], vowels), alphabet[4:25], stringsComplement(alphabet[4:25], vowels),
			alphabet[5:25], stringsComplement(alphabet[5:25], vowels), alphabet[6:25], stringsComplement(alphabet[6:25], vowels),
			alphabet[7:25], stringsComplement(alphabet[7:25], vowels), alphabet[8:25], stringsComplement(alphabet[8:25], vowels),
			alphabet[9:25], stringsComplement(alphabet[9:25], vowels), alphabet[10:25], stringsComplement(alphabet[10:25], vowels),
			alphabet[11:25], stringsComplement(alphabet[11:25], vowels), alphabet[12:25], stringsComplement(alphabet[12:25], vowels),
			alphabet[13:25], stringsComplement(alphabet[13:25], vowels), alphabet[14:25], stringsComplement(alphabet[14:25], vowels),
			alphabet[15:25], stringsComplement(alphabet[15:25], vowels), alphabet[16:25], stringsComplement(alphabet[16:25], vowels),
			alphabet[17:25], stringsComplement(alphabet[17:25], vowels), alphabet[18:25], stringsComplement(alphabet[18:25], vowels),
			alphabet[19:25], stringsComplement(alphabet[19:25], vowels), alphabet[20:25], stringsComplement(alphabet[20:25], vowels),
			alphabet[21:25], stringsComplement(alphabet[21:25], vowels), alphabet[22:25], stringsComplement(alphabet[22:25], vowels),
			alphabet[23:25], stringsComplement(alphabet[23:25], vowels), alphabet[24:25], stringsComplement(alphabet[24:25], vowels),
			alphabet[25:25], stringsComplement(alphabet[25:25], vowels),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
220 221 222
		},
	}

223 224 225 226 227
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	for _, letter := range alphabet {
		block := blocks.NewBlock([]byte(letter))
		if err := bs.Put(block); err != nil {
			t.Fatal(err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
228 229 230
		}
	}

231
	for i := 0; i < numRounds; i++ {
232
		expected := make([][]string, 0, len(testcases))
233
		e := NewEngine(context.Background(), bs, &fakePeerTagger{})
234 235 236 237
		for _, testcase := range testcases {
			set := testcase[0]
			cancels := testcase[1]
			keeps := stringsComplement(set, cancels)
238
			expected = append(expected, keeps)
239 240 241 242 243

			partner := testutil.RandPeerIDFatal(t)

			partnerWants(e, set, partner)
			partnerCancels(e, cancels, partner)
244 245 246 247
		}
		if err := checkHandledInOrder(t, e, expected); err != nil {
			t.Logf("run #%d of %d", i, numRounds)
			t.Fatal(err)
248 249
		}
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
250 251
}

252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
func TestTaggingPeers(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")

	keys := []string{"a", "b", "c", "d", "e"}
	for _, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
		if err := sanfrancisco.Blockstore.Put(block); err != nil {
			t.Fatal(err)
		}
	}
	partnerWants(sanfrancisco.Engine, keys, seattle.Peer)
	next := <-sanfrancisco.Engine.Outbox()
	envelope := <-next

269
	if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 1 {
270 271 272
		t.Fatal("Incorrect number of peers tagged")
	}
	envelope.Sent()
Steven Allen's avatar
Steven Allen committed
273
	<-sanfrancisco.Engine.Outbox()
274 275
	sanfrancisco.PeerTagger.wait(sanfrancisco.Engine.tagQueued)
	if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 0 {
276 277 278
		t.Fatal("Peers should be untagged but weren't")
	}
}
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318

func TestTaggingUseful(t *testing.T) {
	oldShortTerm := shortTerm
	shortTerm = 1 * time.Millisecond
	defer func() { shortTerm = oldShortTerm }()

	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	me := newEngine(ctx, "engine")
	friend := peer.ID("friend")

	block := blocks.NewBlock([]byte("foobar"))
	msg := message.New(false)
	msg.AddBlock(block)

	for i := 0; i < 3; i++ {
		if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
			t.Fatal("Peers should be untagged but weren't")
		}
		me.Engine.MessageSent(friend, msg)
		time.Sleep(shortTerm * 2)
		if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
			t.Fatal("Peers should be tagged but weren't")
		}
		time.Sleep(shortTerm * 8)
	}

	if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
		t.Fatal("peers should still be tagged due to long-term usefulness")
	}
	time.Sleep(shortTerm * 2)
	if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
		t.Fatal("peers should still be tagged due to long-term usefulness")
	}
	time.Sleep(shortTerm * 10)
	if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
		t.Fatal("peers should finally be untagged")
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
319
func partnerWants(e *Engine, keys []string, partner peer.ID) {
320
	add := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
321 322
	for i, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
323
		add.AddEntry(block.Cid(), len(keys)-i)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
324 325 326 327 328
	}
	e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
329
	cancels := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
330 331
	for _, k := range keys {
		block := blocks.NewBlock([]byte(k))
332
		cancels.Cancel(block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
333 334 335 336
	}
	e.MessageReceived(partner, cancels)
}

337 338
func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
	for _, keys := range expected {
339 340
		next := <-e.Outbox()
		envelope := <-next
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
		received := envelope.Message.Blocks()
		// Verify payload message length
		if len(received) != len(keys) {
			return errors.New(fmt.Sprintln("# blocks received", len(received), "# blocks expected", len(keys)))
		}
		// Verify payload message contents
		for _, k := range keys {
			found := false
			expected := blocks.NewBlock([]byte(k))
			for _, block := range received {
				if block.Cid().Equals(expected.Cid()) {
					found = true
					break
				}
			}
			if !found {
				return errors.New(fmt.Sprintln("received", received, "expected", string(expected.RawData())))
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
359 360
		}
	}
361
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
}

func stringsComplement(set, subset []string) []string {
	m := make(map[string]struct{})
	for _, letter := range subset {
		m[letter] = struct{}{}
	}
	var complement []string
	for _, letter := range set {
		if _, exists := m[letter]; !exists {
			complement = append(complement, letter)
		}
	}
	return complement
}