network_test.go 3.77 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6 7
package bitswap

import (
	"sync"
	"testing"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
9 10
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	peer "github.com/jbenet/go-ipfs/p2p/peer"
12
	mockrouting "github.com/jbenet/go-ipfs/routing/mock"
13
	delay "github.com/jbenet/go-ipfs/thirdparty/delay"
14
	testutil "github.com/jbenet/go-ipfs/util/testutil"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15 16 17
)

func TestSendRequestToCooperativePeer(t *testing.T) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18
	net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19

20
	recipientPeer := testutil.RandIdentityOrFatal(t)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
21 22 23

	t.Log("Get two network adapters")

24
	initiator := net.Adapter(testutil.RandIdentityOrFatal(t))
25
	recipient := net.Adapter(recipientPeer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
26 27 28 29

	expectedStr := "response from recipient"
	recipient.SetDelegate(lambda(func(
		ctx context.Context,
30
		from peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
31
		incoming bsmsg.BitSwapMessage) (
32
		peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35 36 37 38

		t.Log("Recipient received a message from the network")

		// TODO test contents of incoming message

		m := bsmsg.New()
Jeromy's avatar
Jeromy committed
39
		m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40

41
		return from, m
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42 43 44 45 46
	}))

	t.Log("Build a message and send a synchronous request to recipient")

	message := bsmsg.New()
Jeromy's avatar
Jeromy committed
47
	message.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	response, err := initiator.SendRequest(
49
		context.Background(), recipientPeer.ID(), message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50 51 52 53 54 55
	if err != nil {
		t.Fatal(err)
	}

	t.Log("Check the contents of the response from recipient")

56 57 58 59
	if response == nil {
		t.Fatal("Should have received a response")
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
60 61 62 63 64 65 66 67 68
	for _, blockFromRecipient := range response.Blocks() {
		if string(blockFromRecipient.Data) == expectedStr {
			return
		}
	}
	t.Fatal("Should have returned after finding expected block data")
}

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
69
	net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
70 71
	responderPeer := testutil.RandIdentityOrFatal(t)
	waiter := net.Adapter(testutil.RandIdentityOrFatal(t))
72
	responder := net.Adapter(responderPeer)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
73 74 75 76 77 78 79 80 81

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
82
		fromWaiter peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83
		msgFromWaiter bsmsg.BitSwapMessage) (
84
		peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
85 86

		msgToWaiter := bsmsg.New()
Jeromy's avatar
Jeromy committed
87
		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88

89
		return fromWaiter, msgToWaiter
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91 92 93
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
94
		fromResponder peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
95
		msgFromResponder bsmsg.BitSwapMessage) (
96
		peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
			if string(b.Data) == expectedStr {
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")

		}
111
		return "", nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
112 113 114
	}))

	messageSentAsync := bsmsg.New()
Jeromy's avatar
Jeromy committed
115
	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
116
	errSending := waiter.SendMessage(
117
		context.Background(), responderPeer.ID(), messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
118 119 120 121 122 123 124
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

125 126
type receiverFunc func(ctx context.Context, p peer.ID,
	incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
127 128 129 130 131 132 133 134 135

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
136 137
	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
		peer.ID, bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138 139 140
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
141 142
	p peer.ID, incoming bsmsg.BitSwapMessage) (
	peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
143 144
	return lam.f(ctx, p, incoming)
}
145 146 147 148

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}