engine_test.go 5.3 KB
Newer Older
1
package decision
2 3

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
5 6
	"errors"
	"fmt"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
7
	"math"
8
	"strings"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
9
	"sync"
10 11
	"testing"

Jeromy's avatar
Jeromy committed
12 13 14 15 16 17 18 19
	message "github.com/ipfs/go-bitswap/message"

	blocks "github.com/ipfs/go-block-format"
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	peer "github.com/libp2p/go-libp2p-peer"
	testutil "github.com/libp2p/go-testutil"
20 21
)

22
type peerAndEngine struct {
23
	Peer   peer.ID
24
	Engine *Engine
25 26
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
27
func newEngine(ctx context.Context, idStr string) peerAndEngine {
28
	return peerAndEngine{
29
		Peer: peer.ID(idStr),
30
		//Strategy: New(true),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
31 32
		Engine: NewEngine(ctx,
			blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))),
33 34 35
	}
}

36
func TestConsistentAccounting(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sender := newEngine(ctx, "Ernie")
	receiver := newEngine(ctx, "Bert")
41 42 43 44

	// Send messages from Ernie to Bert
	for i := 0; i < 1000; i++ {

45
		m := message.New(false)
46
		content := []string{"this", "is", "message", "i"}
Jeromy's avatar
Jeromy committed
47
		m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
48

49 50
		sender.Engine.MessageSent(receiver.Peer, m)
		receiver.Engine.MessageReceived(sender.Peer, m)
51 52 53
	}

	// Ensure sender records the change
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54
	if sender.Engine.numBytesSentTo(receiver.Peer) == 0 {
55 56 57 58
		t.Fatal("Sent bytes were not recorded")
	}

	// Ensure sender and receiver have the same values
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
	if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) {
60 61 62 63 64
		t.Fatal("Inconsistent book-keeping. Strategies don't agree")
	}

	// Ensure sender didn't record receving anything. And that the receiver
	// didn't record sending anything
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
	if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 {
66 67 68 69
		t.Fatal("Bert didn't send bytes to Ernie")
	}
}

70 71
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
72 73 74 75
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sanfrancisco := newEngine(ctx, "sf")
	seattle := newEngine(ctx, "sea")
76

77
	m := message.New(true)
78

79 80
	sanfrancisco.Engine.MessageSent(seattle.Peer, m)
	seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
81

82
	if seattle.Peer == sanfrancisco.Peer {
83 84 85
		t.Fatal("Sanity Check: Peers have same Key!")
	}

86
	if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
87 88 89
		t.Fatal("Peer wasn't added as a Partner")
	}

90
	if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
91 92
		t.Fatal("Peer wasn't added as a Partner")
	}
Jeromy's avatar
Jeromy committed
93 94 95 96 97

	seattle.Engine.PeerDisconnected(sanfrancisco.Peer)
	if peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
		t.Fatal("expected peer to be removed")
	}
98 99
}

100
func peerIsPartner(p peer.ID, e *Engine) bool {
101
	for _, partner := range e.Peers() {
102
		if partner == p {
103 104 105 106 107
			return true
		}
	}
	return false
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
108 109 110 111 112 113 114

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
	t.SkipNow() // TODO implement *Engine.Close
	e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())))
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
115 116
		for nextEnvelope := range e.Outbox() {
			<-nextEnvelope
Brian Tiger Chow's avatar
Brian Tiger Chow committed
117 118 119 120 121 122 123 124 125 126 127
		}
		wg.Done()
	}()
	// e.Close()
	wg.Wait()
	if _, ok := <-e.Outbox(); ok {
		t.Fatal("channel should be closed")
	}
}

func TestPartnerWantsThenCancels(t *testing.T) {
128 129 130 131
	numRounds := 10
	if testing.Short() {
		numRounds = 1
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
132 133 134 135 136
	alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
	vowels := strings.Split("aeiou", "")

	type testCase [][]string
	testcases := []testCase{
rht's avatar
rht committed
137
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138 139
			alphabet, vowels,
		},
rht's avatar
rht committed
140
		{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
141 142 143 144
			alphabet, stringsComplement(alphabet, vowels),
		},
	}

145 146 147 148 149
	bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
	for _, letter := range alphabet {
		block := blocks.NewBlock([]byte(letter))
		if err := bs.Put(block); err != nil {
			t.Fatal(err)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
150 151 152
		}
	}

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
	for i := 0; i < numRounds; i++ {
		for _, testcase := range testcases {
			set := testcase[0]
			cancels := testcase[1]
			keeps := stringsComplement(set, cancels)

			e := NewEngine(context.Background(), bs)
			partner := testutil.RandPeerIDFatal(t)

			partnerWants(e, set, partner)
			partnerCancels(e, cancels, partner)
			if err := checkHandledInOrder(t, e, keeps); err != nil {
				t.Logf("run #%d of %d", i, numRounds)
				t.Fatal(err)
			}
		}
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
170 171 172
}

func partnerWants(e *Engine, keys []string, partner peer.ID) {
173
	add := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
174 175
	for i, letter := range keys {
		block := blocks.NewBlock([]byte(letter))
176
		add.AddEntry(block.Cid(), math.MaxInt32-i)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177 178 179 180 181
	}
	e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
182
	cancels := message.New(false)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
183 184
	for _, k := range keys {
		block := blocks.NewBlock([]byte(k))
185
		cancels.Cancel(block.Cid())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
186 187 188 189
	}
	e.MessageReceived(partner, cancels)
}

190
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
191
	for _, k := range keys {
192 193
		next := <-e.Outbox()
		envelope := <-next
194
		received := envelope.Block
Brian Tiger Chow's avatar
Brian Tiger Chow committed
195
		expected := blocks.NewBlock([]byte(k))
196
		if !received.Cid().Equals(expected.Cid()) {
Jeromy's avatar
Jeromy committed
197
			return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData())))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
198 199
		}
	}
200
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
}

func stringsComplement(set, subset []string) []string {
	m := make(map[string]struct{})
	for _, letter := range subset {
		m[letter] = struct{}{}
	}
	var complement []string
	for _, letter := range set {
		if _, exists := m[letter]; !exists {
			complement = append(complement, letter)
		}
	}
	return complement
}