engine_test.go 5.35 KB
Newer Older
1
package decision
2 3

import (
4 5
	"errors"
	"fmt"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6
	"math"
7
	"strings"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
8
	"sync"
9 10
	"testing"

11 12 13
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	message "github.com/ipfs/go-ipfs/exchange/bitswap/message"
14
	testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
15
	peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
16
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
17 18
	ds "gx/ipfs/QmbCg24DeRKaRDLHbzzSVj7xndmWCPanBLkAM7Lx2nbrFs/go-datastore"
	dssync "gx/ipfs/QmbCg24DeRKaRDLHbzzSVj7xndmWCPanBLkAM7Lx2nbrFs/go-datastore/sync"
19 20
)

21
type peerAndEngine struct {
22
	Peer   peer.ID
23
	Engine *Engine
24 25
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
26
func newEngine(ctx context.Context, idStr string) peerAndEngine {
27
	return peerAndEngine{
28
		Peer: peer.ID(idStr),
29
		//Strategy: New(true),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31
		Engine: NewEngine(ctx,
			blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))),
32 33 34
	}
}

35
func TestConsistentAccounting(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
36 37 38 39
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sender := newEngine(ctx, "Ernie")
	receiver := newEngine(ctx, "Bert")
40 41 42 43

	// Send messages from Ernie to Bert
	for i := 0; i < 1000; i++ {

44
		m := message.New(false)
45
		content := []string{"this", "is", "message", "i"}
Jeromy's avatar
Jeromy committed
46
		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
47

48 49
		sender.Engine.MessageSent(receiver.Peer, m)
		receiver.Engine.MessageReceived(sender.Peer, m)
50 51 52
	}

	// Ensure sender records the change
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53
	if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
54 55 56 57
		t.Fatal("Sent bytes were not recorded")
	}

	// Ensure sender and receiver have the same values
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58
	if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
59 60 61 62 63
		t.Fatal("Inconsistent book-keeping. Strategies don't agree")
	}

	// Ensure sender didn't record receving anything. And that the receiver
	// didn't record sending anything
Brian Tiger Chow's avatar
Brian Tiger Chow committed
64
	if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
65 66 67 68
		t.Fatal("Bert didn't send bytes to Ernie")
	}
}

69 70
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72 73 74
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")
75

76
	m := message.New(true)
77

78 79
	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
80

81
	if seattle.Peer == sanfrancisco.Peer {
82 83 84
		t.Fatal("Sanity Check: Peers have same Key!")
	}

85
	if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
86 87 88
		t.Fatal("Peer wasn't added as a Partner")
	}

89
	if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
90 91 92 93
		t.Fatal("Peer wasn't added as a Partner")
	}
}

94
func peerIsPartner(p peer.ID, e *Engine) bool {
95
	for _, partner := range e.Peers() {
96
		if partner == p {
97 98 99 100 101
			return true
		}
	}
	return false
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103 104 105 106 107 108

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
	t.SkipNow() // TODO implement *Engine.Close
	e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())))
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
109 110
		for nextEnvelope := range e.Outbox() {
			<-nextEnvelope
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112 113 114 115 116 117 118 119 120 121
		}
		wg.Done()
	}()
	// e.Close()
	wg.Wait()
	if _, ok := <-e.Outbox(); ok {
		t.Fatal("channel should be closed")
	}
}

func TestPartnerWantsThenCancels(t *testing.T) {
122 123 124 125
	numRounds := 10
	if testing.Short() {
		numRounds = 1
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126 127 128 129 130
	alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
	vowels := strings.Split("aeiou", "")

	type testCase [][]string
	testcases := []testCase{
rht's avatar
rht committed
131
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
132 133
			alphabet, vowels,
		},
rht's avatar
rht committed
134
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
135 136 137 138
			alphabet, stringsComplement(alphabet, vowels),
		},
	}

139 140 141 142 143
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	for _, letter := range alphabet {
		block := blocks.NewBlock([]byte(letter))
		if err := bs.Put(block); err != nil {
			t.Fatal(err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144 145 146
		}
	}

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
	for i := 0; i < numRounds; i++ {
		for _, testcase := range testcases {
			set := testcase[0]
			cancels := testcase[1]
			keeps := stringsComplement(set, cancels)

			e := NewEngine(context.Background(), bs)
			partner := testutil.RandPeerIDFatal(t)

			partnerWants(e, set, partner)
			partnerCancels(e, cancels, partner)
			if err := checkHandledInOrder(t, e, keeps); err != nil {
				t.Logf("run #%d of %d", i, numRounds)
				t.Fatal(err)
			}
		}
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
164 165 166
}

func partnerWants(e *Engine, keys []string, partner peer.ID) {
167
	add := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
168 169 170 171 172 173 174 175
	for i, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
		add.AddEntry(block.Key(), math.MaxInt32-i)
	}
	e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
176
	cancels := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177 178 179 180 181 182 183
	for _, k := range keys {
		block := blocks.NewBlock([]byte(k))
		cancels.Cancel(block.Key())
	}
	e.MessageReceived(partner, cancels)
}

184
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
185
	for _, k := range keys {
186 187
		next := <-e.Outbox()
		envelope := <-next
188
		received := envelope.Block
Brian Tiger Chow's avatar
Brian Tiger Chow committed
189 190
		expected := blocks.NewBlock([]byte(k))
		if received.Key() != expected.Key() {
191
			return errors.New(fmt.Sprintln("received", string(received.Data()), "expected", string(expected.Data())))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
192 193
		}
	}
194
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
}

func stringsComplement(set, subset []string) []string {
	m := make(map[string]struct{})
	for _, letter := range subset {
		m[letter] = struct{}{}
	}
	var complement []string
	for _, letter := range set {
		if _, exists := m[letter]; !exists {
			complement = append(complement, letter)
		}
	}
	return complement
}