engine_test.go 9.3 KB
Newer Older
1
package decision
2 3

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
5 6
	"errors"
	"fmt"
7
	"strings"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8
	"sync"
9
	"testing"
10
	"time"
11

Jeromy's avatar
Jeromy committed
12 13 14 15 16 17
	message "github.com/ipfs/go-bitswap/message"

	blocks "github.com/ipfs/go-block-format"
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
19
	testutil "github.com/libp2p/go-libp2p-core/test"
20 21
)

22 23 24 25 26
type peerTag struct {
	done  chan struct{}
	peers map[peer.ID]int
}

27
type fakePeerTagger struct {
28 29
	lk   sync.Mutex
	tags map[string]*peerTag
30 31
}

32 33 34
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
35 36 37 38 39 40 41 42 43
	if fpt.tags == nil {
		fpt.tags = make(map[string]*peerTag, 1)
	}
	pt, ok := fpt.tags[tag]
	if !ok {
		pt = &peerTag{peers: make(map[peer.ID]int, 1), done: make(chan struct{})}
		fpt.tags[tag] = pt
	}
	pt.peers[p] = n
44 45 46 47 48
}

func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
49 50 51 52 53 54 55 56
	pt := fpt.tags[tag]
	if pt == nil {
		return
	}
	delete(pt.peers, p)
	if len(pt.peers) == 0 {
		close(pt.done)
		delete(fpt.tags, tag)
57 58 59
	}
}

60
func (fpt *fakePeerTagger) count(tag string) int {
61 62
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
	if pt, ok := fpt.tags[tag]; ok {
		return len(pt.peers)
	}
	return 0
}

func (fpt *fakePeerTagger) wait(tag string) {
	fpt.lk.Lock()
	pt := fpt.tags[tag]
	if pt == nil {
		fpt.lk.Unlock()
		return
	}
	doneCh := pt.done
	fpt.lk.Unlock()
	<-doneCh
79 80 81 82 83 84 85 86 87 88 89 90 91
}

type engineSet struct {
	PeerTagger *fakePeerTagger
	Peer       peer.ID
	Engine     *Engine
	Blockstore blockstore.Blockstore
}

func newEngine(ctx context.Context, idStr string) engineSet {
	fpt := &fakePeerTagger{}
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	return engineSet{
92
		Peer: peer.ID(idStr),
93
		//Strategy: New(true),
94 95
		PeerTagger: fpt,
		Blockstore: bs,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96
		Engine: NewEngine(ctx,
97
			bs, fpt),
98 99 100
	}
}

101
func TestConsistentAccounting(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103 104 105
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sender := newEngine(ctx, "Ernie")
	receiver := newEngine(ctx, "Bert")
106 107 108 109

	// Send messages from Ernie to Bert
	for i := 0; i < 1000; i++ {

110
		m := message.New(false)
111
		content := []string{"this", "is", "message", "i"}
Jeromy's avatar
Jeromy committed
112
		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
113

114 115
		sender.Engine.MessageSent(receiver.Peer, m)
		receiver.Engine.MessageReceived(sender.Peer, m)
116 117 118
	}

	// Ensure sender records the change
Brian Tiger Chow's avatar
Brian Tiger Chow committed
119
	if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
120 121 122 123
		t.Fatal("Sent bytes were not recorded")
	}

	// Ensure sender and receiver have the same values
Brian Tiger Chow's avatar
Brian Tiger Chow committed
124
	if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
125 126 127 128 129
		t.Fatal("Inconsistent book-keeping. Strategies don't agree")
	}

	// Ensure sender didn't record receving anything. And that the receiver
	// didn't record sending anything
Brian Tiger Chow's avatar
Brian Tiger Chow committed
130
	if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
131 132 133 134
		t.Fatal("Bert didn't send bytes to Ernie")
	}
}

135 136
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
137 138 139 140
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")
141

142
	m := message.New(true)
143

144 145
	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
146

147
	if seattle.Peer == sanfrancisco.Peer {
148 149 150
		t.Fatal("Sanity Check: Peers have same Key!")
	}

151
	if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
152 153 154
		t.Fatal("Peer wasn't added as a Partner")
	}

155
	if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
156 157
		t.Fatal("Peer wasn't added as a Partner")
	}
Jeromy's avatar
Jeromy committed
158 159 160 161 162

	seattle.Engine.PeerDisconnected(sanfrancisco.Peer)
	if peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
		t.Fatal("expected peer to be removed")
	}
163 164
}

165
func peerIsPartner(p peer.ID, e *Engine) bool {
166
	for _, partner := range e.Peers() {
167
		if partner == p {
168 169 170 171 172
			return true
		}
	}
	return false
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
173 174 175

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
	t.SkipNow() // TODO implement *Engine.Close
176
	e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{})
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177 178 179
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
180 181
		for nextEnvelope := range e.Outbox() {
			<-nextEnvelope
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182 183 184 185 186 187 188 189 190 191 192
		}
		wg.Done()
	}()
	// e.Close()
	wg.Wait()
	if _, ok := <-e.Outbox(); ok {
		t.Fatal("channel should be closed")
	}
}

func TestPartnerWantsThenCancels(t *testing.T) {
193 194 195 196
	numRounds := 10
	if testing.Short() {
		numRounds = 1
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
197 198 199 200 201
	alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
	vowels := strings.Split("aeiou", "")

	type testCase [][]string
	testcases := []testCase{
rht's avatar
rht committed
202
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
203 204
			alphabet, vowels,
		},
rht's avatar
rht committed
205
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
206
			alphabet, stringsComplement(alphabet, vowels),
207 208 209 210 211 212 213 214 215 216 217 218 219
			alphabet[1:25], stringsComplement(alphabet[1:25], vowels), alphabet[2:25], stringsComplement(alphabet[2:25], vowels),
			alphabet[3:25], stringsComplement(alphabet[3:25], vowels), alphabet[4:25], stringsComplement(alphabet[4:25], vowels),
			alphabet[5:25], stringsComplement(alphabet[5:25], vowels), alphabet[6:25], stringsComplement(alphabet[6:25], vowels),
			alphabet[7:25], stringsComplement(alphabet[7:25], vowels), alphabet[8:25], stringsComplement(alphabet[8:25], vowels),
			alphabet[9:25], stringsComplement(alphabet[9:25], vowels), alphabet[10:25], stringsComplement(alphabet[10:25], vowels),
			alphabet[11:25], stringsComplement(alphabet[11:25], vowels), alphabet[12:25], stringsComplement(alphabet[12:25], vowels),
			alphabet[13:25], stringsComplement(alphabet[13:25], vowels), alphabet[14:25], stringsComplement(alphabet[14:25], vowels),
			alphabet[15:25], stringsComplement(alphabet[15:25], vowels), alphabet[16:25], stringsComplement(alphabet[16:25], vowels),
			alphabet[17:25], stringsComplement(alphabet[17:25], vowels), alphabet[18:25], stringsComplement(alphabet[18:25], vowels),
			alphabet[19:25], stringsComplement(alphabet[19:25], vowels), alphabet[20:25], stringsComplement(alphabet[20:25], vowels),
			alphabet[21:25], stringsComplement(alphabet[21:25], vowels), alphabet[22:25], stringsComplement(alphabet[22:25], vowels),
			alphabet[23:25], stringsComplement(alphabet[23:25], vowels), alphabet[24:25], stringsComplement(alphabet[24:25], vowels),
			alphabet[25:25], stringsComplement(alphabet[25:25], vowels),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
220 221 222
		},
	}

223 224 225 226 227
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	for _, letter := range alphabet {
		block := blocks.NewBlock([]byte(letter))
		if err := bs.Put(block); err != nil {
			t.Fatal(err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
228 229 230
		}
	}

231
	for i := 0; i < numRounds; i++ {
232
		expected := make([][]string, 0, len(testcases))
233
		e := NewEngine(context.Background(), bs, &fakePeerTagger{})
234 235 236 237
		for _, testcase := range testcases {
			set := testcase[0]
			cancels := testcase[1]
			keeps := stringsComplement(set, cancels)
238
			expected = append(expected, keeps)
239 240 241 242 243

			partner := testutil.RandPeerIDFatal(t)

			partnerWants(e, set, partner)
			partnerCancels(e, cancels, partner)
244 245 246 247
		}
		if err := checkHandledInOrder(t, e, expected); err != nil {
			t.Logf("run #%d of %d", i, numRounds)
			t.Fatal(err)
248 249
		}
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
250 251
}

252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
func TestTaggingPeers(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")

	keys := []string{"a", "b", "c", "d", "e"}
	for _, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
		if err := sanfrancisco.Blockstore.Put(block); err != nil {
			t.Fatal(err)
		}
	}
	partnerWants(sanfrancisco.Engine, keys, seattle.Peer)
	next := <-sanfrancisco.Engine.Outbox()
	envelope := <-next

269
	if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 1 {
270 271 272
		t.Fatal("Incorrect number of peers tagged")
	}
	envelope.Sent()
Steven Allen's avatar
Steven Allen committed
273
	<-sanfrancisco.Engine.Outbox()
274 275
	sanfrancisco.PeerTagger.wait(sanfrancisco.Engine.tagQueued)
	if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 0 {
276 277 278
		t.Fatal("Peers should be untagged but weren't")
	}
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
279
func partnerWants(e *Engine, keys []string, partner peer.ID) {
280
	add := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
281 282
	for i, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
283
		add.AddEntry(block.Cid(), len(keys)-i)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
284 285 286 287 288
	}
	e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
289
	cancels := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
290 291
	for _, k := range keys {
		block := blocks.NewBlock([]byte(k))
292
		cancels.Cancel(block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
293 294 295 296
	}
	e.MessageReceived(partner, cancels)
}

297 298
func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
	for _, keys := range expected {
299 300
		next := <-e.Outbox()
		envelope := <-next
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
		received := envelope.Message.Blocks()
		// Verify payload message length
		if len(received) != len(keys) {
			return errors.New(fmt.Sprintln("# blocks received", len(received), "# blocks expected", len(keys)))
		}
		// Verify payload message contents
		for _, k := range keys {
			found := false
			expected := blocks.NewBlock([]byte(k))
			for _, block := range received {
				if block.Cid().Equals(expected.Cid()) {
					found = true
					break
				}
			}
			if !found {
				return errors.New(fmt.Sprintln("received", received, "expected", string(expected.RawData())))
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
319 320
		}
	}
321
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
}

func stringsComplement(set, subset []string) []string {
	m := make(map[string]struct{})
	for _, letter := range subset {
		m[letter] = struct{}{}
	}
	var complement []string
	for _, letter := range set {
		if _, exists := m[letter]; !exists {
			complement = append(complement, letter)
		}
	}
	return complement
}