network_test.go 3.52 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6 7
package bitswap

import (
	"sync"
	"testing"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8
	blocks "github.com/jbenet/go-ipfs/blocks"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
9 10 11 12 13 14
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	peer "github.com/jbenet/go-ipfs/peer"
)

func TestSendRequestToCooperativePeer(t *testing.T) {
15
	net := VirtualNetwork()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
16 17 18 19 20

	idOfRecipient := []byte("recipient")

	t.Log("Get two network adapters")

21 22
	initiator := net.Adapter(peer.WithIDString("initiator"))
	recipient := net.Adapter(peer.WithID(idOfRecipient))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23 24 25 26

	expectedStr := "response from recipient"
	recipient.SetDelegate(lambda(func(
		ctx context.Context,
27
		from peer.Peer,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
		incoming bsmsg.BitSwapMessage) (
29
		peer.Peer, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31 32 33 34 35

		t.Log("Recipient received a message from the network")

		// TODO test contents of incoming message

		m := bsmsg.New()
36
		m.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37

38
		return from, m
Brian Tiger Chow's avatar
Brian Tiger Chow committed
39 40 41 42 43
	}))

	t.Log("Build a message and send a synchronous request to recipient")

	message := bsmsg.New()
44
	message.AppendBlock(*blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
45
	response, err := initiator.SendRequest(
46
		context.Background(), peer.WithID(idOfRecipient), message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
	if err != nil {
		t.Fatal(err)
	}

	t.Log("Check the contents of the response from recipient")

	for _, blockFromRecipient := range response.Blocks() {
		if string(blockFromRecipient.Data) == expectedStr {
			return
		}
	}
	t.Fatal("Should have returned after finding expected block data")
}

func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
62
	net := VirtualNetwork()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63
	idOfResponder := []byte("responder")
64 65
	waiter := net.Adapter(peer.WithIDString("waiter"))
	responder := net.Adapter(peer.WithID(idOfResponder))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67 68 69 70 71 72 73 74

	var wg sync.WaitGroup

	wg.Add(1)

	expectedStr := "received async"

	responder.SetDelegate(lambda(func(
		ctx context.Context,
75
		fromWaiter peer.Peer,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76
		msgFromWaiter bsmsg.BitSwapMessage) (
77
		peer.Peer, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
78 79

		msgToWaiter := bsmsg.New()
80
		msgToWaiter.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81

82
		return fromWaiter, msgToWaiter
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83 84 85 86
	}))

	waiter.SetDelegate(lambda(func(
		ctx context.Context,
87
		fromResponder peer.Peer,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88
		msgFromResponder bsmsg.BitSwapMessage) (
89
		peer.Peer, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90 91 92 93 94 95 96 97 98 99 100 101 102 103

		// TODO assert that this came from the correct peer and that the message contents are as expected
		ok := false
		for _, b := range msgFromResponder.Blocks() {
			if string(b.Data) == expectedStr {
				wg.Done()
				ok = true
			}
		}

		if !ok {
			t.Fatal("Message not received from the responder")

		}
104
		return nil, nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
105 106 107
	}))

	messageSentAsync := bsmsg.New()
108
	messageSentAsync.AppendBlock(*blocks.NewBlock([]byte("data")))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109
	errSending := waiter.SendMessage(
110
		context.Background(), peer.WithID(idOfResponder), messageSentAsync)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111 112 113 114 115 116 117
	if errSending != nil {
		t.Fatal(errSending)
	}

	wg.Wait() // until waiter delegate function is executed
}

118 119
type receiverFunc func(ctx context.Context, p peer.Peer,
	incoming bsmsg.BitSwapMessage) (peer.Peer, bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
120 121 122 123 124 125 126 127 128

// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
	return &lambdaImpl{
		f: f,
	}
}

type lambdaImpl struct {
129 130
	f func(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
		peer.Peer, bsmsg.BitSwapMessage)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
131 132 133
}

func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
134 135
	p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136 137
	return lam.f(ctx, p, incoming)
}
138 139 140 141

func (lam *lambdaImpl) ReceiveError(err error) {
	// TODO log error
}