network_test.go 2.49 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6
package bitswap

import (
	"sync"
	"testing"

7 8 9 10 11 12 13 14
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	blocks "github.com/ipfs/go-ipfs/blocks"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	mockrouting "github.com/ipfs/go-ipfs/routing/mock"
	delay "github.com/ipfs/go-ipfs/thirdparty/delay"
	testutil "github.com/ipfs/go-ipfs/util/testutil"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15 16 17
)

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
18
	net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
19 20
	responderPeer := testutil.RandIdentityOrFatal(t)
	waiter := net.Adapter(testutil.RandIdentityOrFatal(t))
21
	responder := net.Adapter(responderPeer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22 23 24 25 26 27 28 29 30

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
31
		fromWaiter peer.ID,
Jeromy Johnson's avatar
Jeromy Johnson committed
32
		msgFromWaiter bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34

		msgToWaiter := bsmsg.New()
Jeromy's avatar
Jeromy committed
35
		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Jeromy Johnson's avatar
Jeromy Johnson committed
36
		waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37

Jeromy Johnson's avatar
Jeromy Johnson committed
38
		return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
39 40 41 42
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
43
		fromResponder peer.ID,
Jeromy Johnson's avatar
Jeromy Johnson committed
44
		msgFromResponder bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
			if string(b.Data) == expectedStr {
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")

		}
Jeromy Johnson's avatar
Jeromy Johnson committed
59
		return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62
	}))

	messageSentAsync := bsmsg.New()
Jeromy's avatar
Jeromy committed
63
	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
64
	errSending := waiter.SendMessage(
65
		context.Background(), responderPeer.ID(), messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67 68 69 70 71 72
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

73
type receiverFunc func(ctx context.Context, p peer.ID,
Jeromy Johnson's avatar
Jeromy Johnson committed
74
	incoming bsmsg.BitSwapMessage) error
Brian Tiger Chow's avatar
Brian Tiger Chow committed
75 76 77 78 79 80 81 82 83

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
Jeromy Johnson's avatar
Jeromy Johnson committed
84
	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error
Brian Tiger Chow's avatar
Brian Tiger Chow committed
85 86 87
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
Jeromy Johnson's avatar
Jeromy Johnson committed
88
	p peer.ID, incoming bsmsg.BitSwapMessage) error {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90
	return lam.f(ctx, p, incoming)
}
91 92 93 94

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}
95 96 97 98 99 100 101

func (lam *lambdaImpl) PeerConnected(p peer.ID) {
	// TODO
}
func (lam *lambdaImpl) PeerDisconnected(peer.ID) {
	// TODO
}