engine_test.go 5.5 KB
Newer Older
1
package decision
2 3

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
5 6
	"errors"
	"fmt"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
7
	"math"
8
	"strings"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
9
	"sync"
10 11
	"testing"

12 13
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	message "github.com/ipfs/go-ipfs/exchange/bitswap/message"
Steven Allen's avatar
Steven Allen committed
14
	testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
15
	blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
16 17
	ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
	dssync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync"
18
	peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
19 20
)

21
type peerAndEngine struct {
22
	Peer   peer.ID
23
	Engine *Engine
24 25
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
26
func newEngine(ctx context.Context, idStr string) peerAndEngine {
27
	return peerAndEngine{
28
		Peer: peer.ID(idStr),
29
		//Strategy: New(true),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31
		Engine: NewEngine(ctx,
			blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))),
32 33 34
	}
}

35
func TestConsistentAccounting(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
36 37 38 39
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sender := newEngine(ctx, "Ernie")
	receiver := newEngine(ctx, "Bert")
40 41 42 43

	// Send messages from Ernie to Bert
	for i := 0; i < 1000; i++ {

44
		m := message.New(false)
45
		content := []string{"this", "is", "message", "i"}
Jeromy's avatar
Jeromy committed
46
		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
47

48 49
		sender.Engine.MessageSent(receiver.Peer, m)
		receiver.Engine.MessageReceived(sender.Peer, m)
50 51 52
	}

	// Ensure sender records the change
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53
	if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
54 55 56 57
		t.Fatal("Sent bytes were not recorded")
	}

	// Ensure sender and receiver have the same values
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58
	if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
59 60 61 62 63
		t.Fatal("Inconsistent book-keeping. Strategies don't agree")
	}

	// Ensure sender didn't record receving anything. And that the receiver
	// didn't record sending anything
Brian Tiger Chow's avatar
Brian Tiger Chow committed
64
	if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
65 66 67 68
		t.Fatal("Bert didn't send bytes to Ernie")
	}
}

69 70
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72 73 74
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")
75

76
	m := message.New(true)
77

78 79
	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
80

81
	if seattle.Peer == sanfrancisco.Peer {
82 83 84
		t.Fatal("Sanity Check: Peers have same Key!")
	}

85
	if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
86 87 88
		t.Fatal("Peer wasn't added as a Partner")
	}

89
	if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
90 91
		t.Fatal("Peer wasn't added as a Partner")
	}
Jeromy's avatar
Jeromy committed
92 93 94 95 96

	seattle.Engine.PeerDisconnected(sanfrancisco.Peer)
	if peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
		t.Fatal("expected peer to be removed")
	}
97 98
}

99
func peerIsPartner(p peer.ID, e *Engine) bool {
100
	for _, partner := range e.Peers() {
101
		if partner == p {
102 103 104 105 106
			return true
		}
	}
	return false
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
107 108 109 110 111 112 113

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
	t.SkipNow() // TODO implement *Engine.Close
	e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())))
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
114 115
		for nextEnvelope := range e.Outbox() {
			<-nextEnvelope
Brian Tiger Chow's avatar
Brian Tiger Chow committed
116 117 118 119 120 121 122 123 124 125 126
		}
		wg.Done()
	}()
	// e.Close()
	wg.Wait()
	if _, ok := <-e.Outbox(); ok {
		t.Fatal("channel should be closed")
	}
}

func TestPartnerWantsThenCancels(t *testing.T) {
127 128 129 130
	numRounds := 10
	if testing.Short() {
		numRounds = 1
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
131 132 133 134 135
	alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
	vowels := strings.Split("aeiou", "")

	type testCase [][]string
	testcases := []testCase{
rht's avatar
rht committed
136
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
137 138
			alphabet, vowels,
		},
rht's avatar
rht committed
139
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142 143
			alphabet, stringsComplement(alphabet, vowels),
		},
	}

144 145 146 147 148
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	for _, letter := range alphabet {
		block := blocks.NewBlock([]byte(letter))
		if err := bs.Put(block); err != nil {
			t.Fatal(err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
149 150 151
		}
	}

152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
	for i := 0; i < numRounds; i++ {
		for _, testcase := range testcases {
			set := testcase[0]
			cancels := testcase[1]
			keeps := stringsComplement(set, cancels)

			e := NewEngine(context.Background(), bs)
			partner := testutil.RandPeerIDFatal(t)

			partnerWants(e, set, partner)
			partnerCancels(e, cancels, partner)
			if err := checkHandledInOrder(t, e, keeps); err != nil {
				t.Logf("run #%d of %d", i, numRounds)
				t.Fatal(err)
			}
		}
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
169 170 171
}

func partnerWants(e *Engine, keys []string, partner peer.ID) {
172
	add := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
173 174
	for i, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
175
		add.AddEntry(block.Cid(), math.MaxInt32-i)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
176 177 178 179 180
	}
	e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
181
	cancels := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182 183
	for _, k := range keys {
		block := blocks.NewBlock([]byte(k))
184
		cancels.Cancel(block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
185 186 187 188
	}
	e.MessageReceived(partner, cancels)
}

189
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
190
	for _, k := range keys {
191 192
		next := <-e.Outbox()
		envelope := <-next
193
		received := envelope.Block
Brian Tiger Chow's avatar
Brian Tiger Chow committed
194
		expected := blocks.NewBlock([]byte(k))
195
		if !received.Cid().Equals(expected.Cid()) {
Jeromy's avatar
Jeromy committed
196
			return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData())))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
197 198
		}
	}
199
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
}

func stringsComplement(set, subset []string) []string {
	m := make(map[string]struct{})
	for _, letter := range subset {
		m[letter] = struct{}{}
	}
	var complement []string
	for _, letter := range set {
		if _, exists := m[letter]; !exists {
			complement = append(complement, letter)
		}
	}
	return complement
}