network_test.go 2.54 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5 6 7
	"sync"
	"testing"

8 9
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
Hector Sanjuan's avatar
Hector Sanjuan committed
10

11
	delay "gx/ipfs/QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL/go-ipfs-delay"
Steven Allen's avatar
Steven Allen committed
12
	testutil "gx/ipfs/QmUJzxQQ2kzwQubsMqBTr1NGDpLfh7pGA2E1oaJULcKDPq/go-testutil"
Steven Allen's avatar
Steven Allen committed
13
	mockrouting "gx/ipfs/QmcE3B6ittYBmctva8Q155LPa1YPcVqg8N7pPcgt9i7iAQ/go-ipfs-routing/mock"
Steven Allen's avatar
Steven Allen committed
14
	peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
15
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
16 17 18
)

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
19
	net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
20 21
	responderPeer := testutil.RandIdentityOrFatal(t)
	waiter := net.Adapter(testutil.RandIdentityOrFatal(t))
22
	responder := net.Adapter(responderPeer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24 25 26 27 28 29 30 31

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
32
		fromWaiter peer.ID,
33
		msgFromWaiter bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34

35
		msgToWaiter := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
36
		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Jeromy Johnson's avatar
Jeromy Johnson committed
37
		waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
38 39 40 41
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
42
		fromResponder peer.ID,
43
		msgFromResponder bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44 45 46 47

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
Jeromy's avatar
Jeromy committed
48
			if string(b.RawData()) == expectedStr {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52 53 54 55 56 57 58
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")
		}
	}))

59
	messageSentAsync := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
60
	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
61
	errSending := waiter.SendMessage(
62
		context.Background(), responderPeer.ID(), messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66 67 68 69
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

70
type receiverFunc func(ctx context.Context, p peer.ID,
71
	incoming bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72 73 74 75 76 77 78 79 80

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
81
	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82 83 84
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
85 86
	p peer.ID, incoming bsmsg.BitSwapMessage) {
	lam.f(ctx, p, incoming)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
}
88 89 90 91

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}
92 93 94 95 96 97 98

func (lam *lambdaImpl) PeerConnected(p peer.ID) {
	// TODO
}
func (lam *lambdaImpl) PeerDisconnected(peer.ID) {
	// TODO
}