network_test.go 3.56 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6 7
package bitswap

import (
	"sync"
	"testing"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8

9
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10 11 12
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	peer "github.com/jbenet/go-ipfs/peer"
13
	delay "github.com/jbenet/go-ipfs/util/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
14 15 16
)

func TestSendRequestToCooperativePeer(t *testing.T) {
17
	net := VirtualNetwork(delay.Fixed(0))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18

19
	idOfRecipient := peer.ID("recipient")
Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22

	t.Log("Get two network adapters")

23 24
	initiator := net.Adapter(peer.ID("initiator"))
	recipient := net.Adapter(idOfRecipient)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25 26 27 28

	expectedStr := "response from recipient"
	recipient.SetDelegate(lambda(func(
		ctx context.Context,
29
		from peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30
		incoming bsmsg.BitSwapMessage) (
31
		peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
32 33 34 35 36 37

		t.Log("Recipient received a message from the network")

		// TODO test contents of incoming message

		m := bsmsg.New()
Jeromy's avatar
Jeromy committed
38
		m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
39

40
		return from, m
Brian Tiger Chow's avatar
Brian Tiger Chow committed
41 42 43 44 45
	}))

	t.Log("Build a message and send a synchronous request to recipient")

	message := bsmsg.New()
Jeromy's avatar
Jeromy committed
46
	message.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
	response, err := initiator.SendRequest(
48
		context.Background(), idOfRecipient, message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52 53 54
	if err != nil {
		t.Fatal(err)
	}

	t.Log("Check the contents of the response from recipient")

55 56 57 58
	if response == nil {
		t.Fatal("Should have received a response")
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60 61 62 63 64 65 66 67
	for _, blockFromRecipient := range response.Blocks() {
		if string(blockFromRecipient.Data) == expectedStr {
			return
		}
	}
	t.Fatal("Should have returned after finding expected block data")
}

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
68
	net := VirtualNetwork(delay.Fixed(0))
69 70 71
	idOfResponder := peer.ID("responder")
	waiter := net.Adapter(peer.ID("waiter"))
	responder := net.Adapter(idOfResponder)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72 73 74 75 76 77 78 79 80

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
81
		fromWaiter peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
82
		msgFromWaiter bsmsg.BitSwapMessage) (
83
		peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84 85

		msgToWaiter := bsmsg.New()
Jeromy's avatar
Jeromy committed
86
		msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87

88
		return fromWaiter, msgToWaiter
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89 90 91 92
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
93
		fromResponder peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
		msgFromResponder bsmsg.BitSwapMessage) (
95
		peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
			if string(b.Data) == expectedStr {
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")

		}
110
		return "", nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112 113
	}))

	messageSentAsync := bsmsg.New()
Jeromy's avatar
Jeromy committed
114
	messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
115
	errSending := waiter.SendMessage(
116
		context.Background(), idOfResponder, messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
117 118 119 120 121 122 123
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

124 125
type receiverFunc func(ctx context.Context, p peer.ID,
	incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126 127 128 129 130 131 132 133 134

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
135 136
	f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
		peer.ID, bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
137 138 139
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
140 141
	p peer.ID, incoming bsmsg.BitSwapMessage) (
	peer.ID, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
142 143
	return lam.f(ctx, p, incoming)
}
144 145 146 147

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}