engine_test.go 8.91 KB
Newer Older
1
package decision
2 3

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
5 6
	"errors"
	"fmt"
7
	"strings"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8
	"sync"
9
	"testing"
10
	"time"
11

Jeromy's avatar
Jeromy committed
12 13 14 15 16 17 18 19
	message "github.com/ipfs/go-bitswap/message"

	blocks "github.com/ipfs/go-block-format"
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	peer "github.com/libp2p/go-libp2p-peer"
	testutil "github.com/libp2p/go-testutil"
20 21
)

22 23 24 25
type fakePeerTagger struct {
	lk          sync.Mutex
	wait        sync.WaitGroup
	taggedPeers []peer.ID
26 27
}

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
	fpt.wait.Add(1)

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	fpt.taggedPeers = append(fpt.taggedPeers, p)
}

func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
	defer fpt.wait.Done()

	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	for i := 0; i < len(fpt.taggedPeers); i++ {
		if fpt.taggedPeers[i] == p {
			fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
			fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
			return
		}
	}
}

func (fpt *fakePeerTagger) count() int {
	fpt.lk.Lock()
	defer fpt.lk.Unlock()
	return len(fpt.taggedPeers)
}

type engineSet struct {
	PeerTagger *fakePeerTagger
	Peer       peer.ID
	Engine     *Engine
	Blockstore blockstore.Blockstore
}

func newEngine(ctx context.Context, idStr string) engineSet {
	fpt := &fakePeerTagger{}
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	return engineSet{
67
		Peer: peer.ID(idStr),
68
		//Strategy: New(true),
69 70
		PeerTagger: fpt,
		Blockstore: bs,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71
		Engine: NewEngine(ctx,
72
			bs, fpt),
73 74 75
	}
}

76
func TestConsistentAccounting(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77 78 79 80
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sender := newEngine(ctx, "Ernie")
	receiver := newEngine(ctx, "Bert")
81 82 83 84

	// Send messages from Ernie to Bert
	for i := 0; i < 1000; i++ {

85
		m := message.New(false)
86
		content := []string{"this", "is", "message", "i"}
Jeromy's avatar
Jeromy committed
87
		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
88

89 90
		sender.Engine.MessageSent(receiver.Peer, m)
		receiver.Engine.MessageReceived(sender.Peer, m)
91 92 93
	}

	// Ensure sender records the change
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
	if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
95 96 97 98
		t.Fatal("Sent bytes were not recorded")
	}

	// Ensure sender and receiver have the same values
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99
	if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
100 101 102 103 104
		t.Fatal("Inconsistent book-keeping. Strategies don't agree")
	}

	// Ensure sender didn't record receving anything. And that the receiver
	// didn't record sending anything
Brian Tiger Chow's avatar
Brian Tiger Chow committed
105
	if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
106 107 108 109
		t.Fatal("Bert didn't send bytes to Ernie")
	}
}

110 111
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
112 113 114 115
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")
116

117
	m := message.New(true)
118

119 120
	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
121

122
	if seattle.Peer == sanfrancisco.Peer {
123 124 125
		t.Fatal("Sanity Check: Peers have same Key!")
	}

126
	if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
127 128 129
		t.Fatal("Peer wasn't added as a Partner")
	}

130
	if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
131 132
		t.Fatal("Peer wasn't added as a Partner")
	}
Jeromy's avatar
Jeromy committed
133 134 135 136 137

	seattle.Engine.PeerDisconnected(sanfrancisco.Peer)
	if peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
		t.Fatal("expected peer to be removed")
	}
138 139
}

140
func peerIsPartner(p peer.ID, e *Engine) bool {
141
	for _, partner := range e.Peers() {
142
		if partner == p {
143 144 145 146 147
			return true
		}
	}
	return false
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
148 149 150

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
	t.SkipNow() // TODO implement *Engine.Close
151
	e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{})
Brian Tiger Chow's avatar
Brian Tiger Chow committed
152 153 154
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
155 156
		for nextEnvelope := range e.Outbox() {
			<-nextEnvelope
Brian Tiger Chow's avatar
Brian Tiger Chow committed
157 158 159 160 161 162 163 164 165 166 167
		}
		wg.Done()
	}()
	// e.Close()
	wg.Wait()
	if _, ok := <-e.Outbox(); ok {
		t.Fatal("channel should be closed")
	}
}

func TestPartnerWantsThenCancels(t *testing.T) {
168 169 170 171
	numRounds := 10
	if testing.Short() {
		numRounds = 1
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
172 173 174 175 176
	alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
	vowels := strings.Split("aeiou", "")

	type testCase [][]string
	testcases := []testCase{
rht's avatar
rht committed
177
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
178 179
			alphabet, vowels,
		},
rht's avatar
rht committed
180
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
181
			alphabet, stringsComplement(alphabet, vowels),
182 183 184 185 186 187 188 189 190 191 192 193 194
			alphabet[1:25], stringsComplement(alphabet[1:25], vowels), alphabet[2:25], stringsComplement(alphabet[2:25], vowels),
			alphabet[3:25], stringsComplement(alphabet[3:25], vowels), alphabet[4:25], stringsComplement(alphabet[4:25], vowels),
			alphabet[5:25], stringsComplement(alphabet[5:25], vowels), alphabet[6:25], stringsComplement(alphabet[6:25], vowels),
			alphabet[7:25], stringsComplement(alphabet[7:25], vowels), alphabet[8:25], stringsComplement(alphabet[8:25], vowels),
			alphabet[9:25], stringsComplement(alphabet[9:25], vowels), alphabet[10:25], stringsComplement(alphabet[10:25], vowels),
			alphabet[11:25], stringsComplement(alphabet[11:25], vowels), alphabet[12:25], stringsComplement(alphabet[12:25], vowels),
			alphabet[13:25], stringsComplement(alphabet[13:25], vowels), alphabet[14:25], stringsComplement(alphabet[14:25], vowels),
			alphabet[15:25], stringsComplement(alphabet[15:25], vowels), alphabet[16:25], stringsComplement(alphabet[16:25], vowels),
			alphabet[17:25], stringsComplement(alphabet[17:25], vowels), alphabet[18:25], stringsComplement(alphabet[18:25], vowels),
			alphabet[19:25], stringsComplement(alphabet[19:25], vowels), alphabet[20:25], stringsComplement(alphabet[20:25], vowels),
			alphabet[21:25], stringsComplement(alphabet[21:25], vowels), alphabet[22:25], stringsComplement(alphabet[22:25], vowels),
			alphabet[23:25], stringsComplement(alphabet[23:25], vowels), alphabet[24:25], stringsComplement(alphabet[24:25], vowels),
			alphabet[25:25], stringsComplement(alphabet[25:25], vowels),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195 196 197
		},
	}

198 199 200 201 202
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	for _, letter := range alphabet {
		block := blocks.NewBlock([]byte(letter))
		if err := bs.Put(block); err != nil {
			t.Fatal(err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
203 204 205
		}
	}

206
	for i := 0; i < numRounds; i++ {
207
		expected := make([][]string, 0, len(testcases))
208
		e := NewEngine(context.Background(), bs, &fakePeerTagger{})
209 210 211 212
		for _, testcase := range testcases {
			set := testcase[0]
			cancels := testcase[1]
			keeps := stringsComplement(set, cancels)
213
			expected = append(expected, keeps)
214 215 216 217 218

			partner := testutil.RandPeerIDFatal(t)

			partnerWants(e, set, partner)
			partnerCancels(e, cancels, partner)
219 220 221 222
		}
		if err := checkHandledInOrder(t, e, expected); err != nil {
			t.Logf("run #%d of %d", i, numRounds)
			t.Fatal(err)
223 224
		}
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
225 226
}

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
func TestTaggingPeers(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")

	keys := []string{"a", "b", "c", "d", "e"}
	for _, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
		if err := sanfrancisco.Blockstore.Put(block); err != nil {
			t.Fatal(err)
		}
	}
	partnerWants(sanfrancisco.Engine, keys, seattle.Peer)
	next := <-sanfrancisco.Engine.Outbox()
	envelope := <-next

	if sanfrancisco.PeerTagger.count() != 1 {
		t.Fatal("Incorrect number of peers tagged")
	}
	envelope.Sent()
	next = <-sanfrancisco.Engine.Outbox()
	sanfrancisco.PeerTagger.wait.Wait()
	if sanfrancisco.PeerTagger.count() != 0 {
		t.Fatal("Peers should be untagged but weren't")
	}
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
254
func partnerWants(e *Engine, keys []string, partner peer.ID) {
255
	add := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
256 257
	for i, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
258
		add.AddEntry(block.Cid(), len(keys)-i)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
259 260 261 262 263
	}
	e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
264
	cancels := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
265 266
	for _, k := range keys {
		block := blocks.NewBlock([]byte(k))
267
		cancels.Cancel(block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
268 269 270 271
	}
	e.MessageReceived(partner, cancels)
}

272 273
func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
	for _, keys := range expected {
274 275
		next := <-e.Outbox()
		envelope := <-next
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
		received := envelope.Message.Blocks()
		// Verify payload message length
		if len(received) != len(keys) {
			return errors.New(fmt.Sprintln("# blocks received", len(received), "# blocks expected", len(keys)))
		}
		// Verify payload message contents
		for _, k := range keys {
			found := false
			expected := blocks.NewBlock([]byte(k))
			for _, block := range received {
				if block.Cid().Equals(expected.Cid()) {
					found = true
					break
				}
			}
			if !found {
				return errors.New(fmt.Sprintln("received", received, "expected", string(expected.RawData())))
			}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
294 295
		}
	}
296
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
}

func stringsComplement(set, subset []string) []string {
	m := make(map[string]struct{})
	for _, letter := range subset {
		m[letter] = struct{}{}
	}
	var complement []string
	for _, letter := range set {
		if _, exists := m[letter]; !exists {
			complement = append(complement, letter)
		}
	}
	return complement
}