network_test.go 2.39 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3
package bitswap

import (
Jan Winkelmann's avatar
Jan Winkelmann committed
4
	"context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
5 6 7
	"sync"
	"testing"

8 9
	bsmsg "gitlab.dms3.io/dms3/go-bitswap/message"
	bsnet "gitlab.dms3.io/dms3/go-bitswap/network"
Jeromy's avatar
Jeromy committed
10

11 12 13
	blocks "gitlab.dms3.io/dms3/go-block-format"
	delay "gitlab.dms3.io/dms3/go-dms3-delay"
	mockrouting "gitlab.dms3.io/dms3/go-dms3-routing/mock"
Raúl Kripalani's avatar
Raúl Kripalani committed
14

15 16
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
	tnet "gitlab.dms3.io/p2p/go-p2p-testing/net"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17 18 19
)

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
20
	net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
Raúl Kripalani's avatar
Raúl Kripalani committed
21 22
	responderPeer := tnet.RandIdentityOrFatal(t)
	waiter := net.Adapter(tnet.RandIdentityOrFatal(t))
23
	responder := net.Adapter(responderPeer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
24 25 26 27 28 29 30 31 32

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
33
		fromWaiter peer.ID,
34
		msgFromWaiter bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35

36
		msgToWaiter := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
37
		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Steven Allen's avatar
Steven Allen committed
38 39 40 41
		err := waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
		if err != nil {
			t.Error(err)
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42 43 44 45
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
46
		fromResponder peer.ID,
47
		msgFromResponder bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48 49 50 51

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
Jeromy's avatar
Jeromy committed
52
			if string(b.RawData()) == expectedStr {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53 54 55 56 57 58 59 60 61 62
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")
		}
	}))

63
	messageSentAsync := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
64
	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
	errSending := waiter.SendMessage(
66
		context.Background(), responderPeer.ID(), messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
67 68 69 70 71 72 73
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

74
type receiverFunc func(ctx context.Context, p peer.ID,
75
	incoming bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76 77 78 79 80 81 82 83 84

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
85
	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86 87 88
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
89 90
	p peer.ID, incoming bsmsg.BitSwapMessage) {
	lam.f(ctx, p, incoming)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91
}
92 93 94 95

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}
96 97 98 99 100 101 102

func (lam *lambdaImpl) PeerConnected(p peer.ID) {
	// TODO
}
func (lam *lambdaImpl) PeerDisconnected(peer.ID) {
	// TODO
}