network_test.go 2.48 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6
package bitswap

import (
	"sync"
	"testing"

7 8 9 10 11
	blocks "github.com/ipfs/go-ipfs/blocks"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
12
	testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
Jeromy's avatar
Jeromy committed
13
	peer "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/peer"
Jeromy's avatar
Jeromy committed
14
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15 16 17
)

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
18
	net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
19 20
	responderPeer := testutil.RandIdentityOrFatal(t)
	waiter := net.Adapter(testutil.RandIdentityOrFatal(t))
21
	responder := net.Adapter(responderPeer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23 24 25 26 27 28 29 30

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
31
		fromWaiter peer.ID,
32
		msgFromWaiter bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33

34
		msgToWaiter := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
35
		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Jeromy Johnson's avatar
Jeromy Johnson committed
36
		waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
41
		fromResponder peer.ID,
42
		msgFromResponder bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
			if string(b.Data) == expectedStr {
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")
		}
	}))

58
	messageSentAsync := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
59
	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
60
	errSending := waiter.SendMessage(
61
		context.Background(), responderPeer.ID(), messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62 63 64 65 66 67 68
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

69
type receiverFunc func(ctx context.Context, p peer.ID,
70
	incoming bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72 73 74 75 76 77 78 79

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
80
	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81 82 83
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
84 85
	p peer.ID, incoming bsmsg.BitSwapMessage) {
	lam.f(ctx, p, incoming)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86
}
87 88 89 90

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}
91 92 93 94 95 96 97

func (lam *lambdaImpl) PeerConnected(p peer.ID) {
	// TODO
}
func (lam *lambdaImpl) PeerDisconnected(peer.ID) {
	// TODO
}