network.go 3.9 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4
package bitswap

import (
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"fmt"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8

Brian Tiger Chow's avatar
Brian Tiger Chow committed
9 10 11
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	peer "github.com/jbenet/go-ipfs/peer"
12
	delay "github.com/jbenet/go-ipfs/util/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13 14 15
)

type Network interface {
16
	Adapter(peer.ID) bsnet.BitSwapNetwork
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17

18
	HasPeer(peer.ID) bool
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
	SendMessage(
		ctx context.Context,
22 23
		from peer.ID,
		to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
24 25 26 27
		message bsmsg.BitSwapMessage) error

	SendRequest(
		ctx context.Context,
28 29
		from peer.ID,
		to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31 32 33 34 35
		message bsmsg.BitSwapMessage) (
		incoming bsmsg.BitSwapMessage, err error)
}

// network impl

36
func VirtualNetwork(d delay.D) Network {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37
	return &network{
38
		clients: make(map[peer.ID]bsnet.Receiver),
39
		delay:   d,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40 41 42 43
	}
}

type network struct {
44
	clients map[peer.ID]bsnet.Receiver
45
	delay   delay.D
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47
}

48
func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52
	client := &networkClient{
		local:   p,
		network: n,
	}
53
	n.clients[p] = client
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54 55 56
	return client
}

57 58
func (n *network) HasPeer(p peer.ID) bool {
	_, found := n.clients[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60 61
	return found
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
62 63 64 65
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
	ctx context.Context,
66 67
	from peer.ID,
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
68 69
	message bsmsg.BitSwapMessage) error {

70
	receiver, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72 73 74 75 76 77 78 79 80 81 82 83
	if !ok {
		return errors.New("Cannot locate peer on network")
	}

	// nb: terminate the context since the context wouldn't actually be passed
	// over the network in a real scenario

	go n.deliver(receiver, from, message)

	return nil
}

func (n *network) deliver(
84 85
	r bsnet.Receiver, from peer.ID, message bsmsg.BitSwapMessage) error {
	if message == nil || from == "" {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86 87 88
		return errors.New("Invalid input")
	}

89 90
	n.delay.Wait()

91
	nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92

93
	if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
94 95 96
		return errors.New("Malformed client request")
	}

97
	if nextPeer == "" && nextMsg == nil { // no response to send
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98 99 100
		return nil
	}

101
	nextReceiver, ok := n.clients[nextPeer]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
102 103 104 105 106 107 108 109 110 111
	if !ok {
		return errors.New("Cannot locate peer on network")
	}
	go n.deliver(nextReceiver, nextPeer, nextMsg)
	return nil
}

// TODO
func (n *network) SendRequest(
	ctx context.Context,
112 113
	from peer.ID,
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
114 115 116
	message bsmsg.BitSwapMessage) (
	incoming bsmsg.BitSwapMessage, err error) {

117
	r, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
118 119 120
	if !ok {
		return nil, errors.New("Cannot locate peer on network")
	}
121
	nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
122 123

	// TODO dedupe code
124
	if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
125 126
		r.ReceiveError(errors.New("Malformed client request"))
		return nil, nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
127 128 129
	}

	// TODO dedupe code
130
	if nextPeer == "" && nextMsg == nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
131 132 133 134
		return nil, nil
	}

	// TODO test when receiver doesn't immediately respond to the initiator of the request
135
	if nextPeer != from {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136
		go func() {
137
			nextReceiver, ok := n.clients[nextPeer]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138 139 140 141 142
			if !ok {
				// TODO log the error?
			}
			n.deliver(nextReceiver, nextPeer, nextMsg)
		}()
143
		return nil, nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144 145 146 147 148
	}
	return nextMsg, nil
}

type networkClient struct {
149
	local peer.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
150 151 152 153 154 155
	bsnet.Receiver
	network Network
}

func (nc *networkClient) SendMessage(
	ctx context.Context,
156
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
157 158 159 160 161 162
	message bsmsg.BitSwapMessage) error {
	return nc.network.SendMessage(ctx, nc.local, to, message)
}

func (nc *networkClient) SendRequest(
	ctx context.Context,
163
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
164 165 166 167
	message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
	return nc.network.SendRequest(ctx, nc.local, to, message)
}

168
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169
	// no need to do anything because dialing isn't a thing in this test net.
170 171
	if !nc.network.HasPeer(p) {
		return fmt.Errorf("Peer not in network: %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172 173 174 175
	}
	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
176 177 178
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
	nc.Receiver = r
}