network.go 4.04 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4
package bitswap

import (
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"fmt"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
6 7

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
8

Brian Tiger Chow's avatar
Brian Tiger Chow committed
9 10 11
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	peer "github.com/jbenet/go-ipfs/peer"
12
	delay "github.com/jbenet/go-ipfs/util/delay"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13 14 15
)

type Network interface {
16
	Adapter(peer.ID) bsnet.BitSwapNetwork
Brian Tiger Chow's avatar
Brian Tiger Chow committed
17

18
	HasPeer(peer.ID) bool
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21
	SendMessage(
		ctx context.Context,
22 23
		from peer.ID,
		to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
24 25 26 27
		message bsmsg.BitSwapMessage) error

	SendRequest(
		ctx context.Context,
28 29
		from peer.ID,
		to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31 32 33 34 35
		message bsmsg.BitSwapMessage) (
		incoming bsmsg.BitSwapMessage, err error)
}

// network impl

36
func VirtualNetwork(d delay.D) Network {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37
	return &network{
38
		clients: make(map[peer.ID]bsnet.Receiver),
39
		delay:   d,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40 41 42 43
	}
}

type network struct {
44
	clients map[peer.ID]bsnet.Receiver
45
	delay   delay.D
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47
}

48
func (n *network) Adapter(p peer.ID) bsnet.BitSwapNetwork {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49
	client := &networkClient{
50 51 52
		local:     p,
		network:   n,
		peerstore: peer.NewPeerstore(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53
	}
54
	n.clients[p] = client
Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56 57
	return client
}

58 59
func (n *network) HasPeer(p peer.ID) bool {
	_, found := n.clients[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62
	return found
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
	ctx context.Context,
67 68
	from peer.ID,
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
69 70
	message bsmsg.BitSwapMessage) error {

71
	receiver, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72 73 74 75 76 77 78 79 80 81 82 83 84
	if !ok {
		return errors.New("Cannot locate peer on network")
	}

	// nb: terminate the context since the context wouldn't actually be passed
	// over the network in a real scenario

	go n.deliver(receiver, from, message)

	return nil
}

func (n *network) deliver(
85 86
	r bsnet.Receiver, from peer.ID, message bsmsg.BitSwapMessage) error {
	if message == nil || from == "" {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87 88 89
		return errors.New("Invalid input")
	}

90 91
	n.delay.Wait()

92
	nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
93

94
	if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
95 96 97
		return errors.New("Malformed client request")
	}

98
	if nextPeer == "" && nextMsg == nil { // no response to send
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99 100 101
		return nil
	}

102
	nextReceiver, ok := n.clients[nextPeer]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
103 104 105 106 107 108 109 110 111 112
	if !ok {
		return errors.New("Cannot locate peer on network")
	}
	go n.deliver(nextReceiver, nextPeer, nextMsg)
	return nil
}

// TODO
func (n *network) SendRequest(
	ctx context.Context,
113 114
	from peer.ID,
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
115 116 117
	message bsmsg.BitSwapMessage) (
	incoming bsmsg.BitSwapMessage, err error) {

118
	r, ok := n.clients[to]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
119 120 121
	if !ok {
		return nil, errors.New("Cannot locate peer on network")
	}
122
	nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
123 124

	// TODO dedupe code
125
	if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
126 127
		r.ReceiveError(errors.New("Malformed client request"))
		return nil, nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
128 129 130
	}

	// TODO dedupe code
131
	if nextPeer == "" && nextMsg == nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
132 133 134 135
		return nil, nil
	}

	// TODO test when receiver doesn't immediately respond to the initiator of the request
136
	if nextPeer != from {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
137
		go func() {
138
			nextReceiver, ok := n.clients[nextPeer]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
139 140 141 142 143
			if !ok {
				// TODO log the error?
			}
			n.deliver(nextReceiver, nextPeer, nextMsg)
		}()
144
		return nil, nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
145 146 147 148 149
	}
	return nextMsg, nil
}

type networkClient struct {
150
	local peer.ID
Brian Tiger Chow's avatar
Brian Tiger Chow committed
151
	bsnet.Receiver
152 153
	network   Network
	peerstore peer.Peerstore
Brian Tiger Chow's avatar
Brian Tiger Chow committed
154 155 156 157
}

func (nc *networkClient) SendMessage(
	ctx context.Context,
158
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
159 160 161 162 163 164
	message bsmsg.BitSwapMessage) error {
	return nc.network.SendMessage(ctx, nc.local, to, message)
}

func (nc *networkClient) SendRequest(
	ctx context.Context,
165
	to peer.ID,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
166 167 168 169
	message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
	return nc.network.SendRequest(ctx, nc.local, to, message)
}

170
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171
	// no need to do anything because dialing isn't a thing in this test net.
172 173
	if !nc.network.HasPeer(p) {
		return fmt.Errorf("Peer not in network: %s", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175 176 177
	}
	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
178 179 180
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
	nc.Receiver = r
}
181 182 183 184

func (nc *networkClient) Peerstore() peer.Peerstore {
	return nc.peerstore
}