bitswap.go 8.48 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5
package bitswap

import (
Jeromy's avatar
Jeromy committed
6 7
	"time"

8
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
9
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	blocks "github.com/jbenet/go-ipfs/blocks"
12
	blockstore "github.com/jbenet/go-ipfs/blockstore"
13 14 15 16 17
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
18
	peer "github.com/jbenet/go-ipfs/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19
	u "github.com/jbenet/go-ipfs/util"
20
	"github.com/jbenet/go-ipfs/util/eventlog"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22
)

23
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26 27
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
28
// Runs until context is cancelled
29 30
func New(ctx context.Context, p peer.Peer,
	network bsnet.BitSwapNetwork, routing bsnet.Routing,
31
	d ds.ThreadSafeDatastore, nice bool) exchange.Interface {
32

33 34
	notif := notifications.New()
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36
		<-ctx.Done()
		notif.Shutdown()
37 38
	}()

39 40
	bs := &bitswap{
		blockstore:    blockstore.NewBlockstore(d),
41
		notifications: notif,
42
		strategy:      strategy.New(nice),
43
		routing:       routing,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
		sender:        network,
45
		wantlist:      u.NewKeySet(),
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
46
		blockRequests: make(chan u.Key, 32),
47
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
	network.SetDelegate(bs)
Jeromy's avatar
Jeromy committed
49
	go bs.run(ctx)
50 51 52 53

	return bs
}

54 55
// bitswap instances implement the bitswap protocol.
type bitswap struct {
56

57
	// sender delivers messages on behalf of the session
58
	sender bsnet.BitSwapNetwork
59

60
	// blockstore is the local database
61
	// NB: ensure threadsafety
62
	blockstore blockstore.Blockstore
63

64
	// routing interface for communication
65
	routing bsnet.Routing
66

67
	notifications notifications.PubSub
68

Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
69
	blockRequests chan u.Key
Jeromy's avatar
Jeromy committed
70

71
	// strategy listens to network traffic and makes decisions about how to
72
	// interact with partners.
73 74
	// TODO(brian): save the strategy's state to the datastore
	strategy strategy.Strategy
75

76
	wantlist u.KeySet
77 78
}

79 80
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context
81 82
//
// TODO ensure only one active request per key
Jeromy's avatar
Jeromy committed
83
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
84 85 86 87

	// make sure to derive a new |ctx| and pass it to children. It's correct to
	// listen on |parent| here, but incorrect to pass |parent| to new async
	// functions. This is difficult to enforce. May this comment keep you safe.
88

89
	ctx, cancelFunc := context.WithCancel(parent)
90 91
	defer cancelFunc()

92 93
	ctx = eventlog.ContextWithMetadata(ctx, eventlog.Uuid("GetBlockRequest"))
	log.Event(ctx, "GetBlockRequestBegin", &k)
94 95

	defer func() {
96
		log.Event(ctx, "GetBlockRequestEnd", &k)
97 98
	}()

99
	bs.wantlist.Add(k)
100
	promise := bs.notifications.Subscribe(ctx, k)
101

Jeromy's avatar
Jeromy committed
102
	select {
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
103
	case bs.blockRequests <- k:
Jeromy's avatar
Jeromy committed
104 105 106
	case <-parent.Done():
		return nil, parent.Err()
	}
107 108

	select {
109
	case block := <-promise:
110
		bs.wantlist.Remove(k)
111
		return &block, nil
112 113
	case <-parent.Done():
		return nil, parent.Err()
114 115 116
	}
}

Jeromy's avatar
Jeromy committed
117 118 119 120 121 122 123 124 125 126 127
func (bs *bitswap) GetBlocks(parent context.Context, ks []u.Key) (*blocks.Block, error) {
	// TODO: something smart
	return nil, nil
}

func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
		message.AddWanted(wanted)
	}
	for peerToQuery := range peers {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
128
		log.Event(ctx, "PeerToQuery", peerToQuery)
Jeromy's avatar
Jeromy committed
129 130
		go func(p peer.Peer) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
131
			log.Event(ctx, "DialPeer", p)
Jeromy's avatar
Jeromy committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
			err := bs.sender.DialPeer(ctx, p)
			if err != nil {
				log.Errorf("Error sender.DialPeer(%s)", p)
				return
			}

			response, err := bs.sender.SendRequest(ctx, p, message)
			if err != nil {
				log.Errorf("Error sender.SendRequest(%s) = %s", p, err)
				return
			}
			// FIXME ensure accounting is handled correctly when
			// communication fails. May require slightly different API to
			// get better guarantees. May need shared sequence numbers.
			bs.strategy.MessageSent(p, message)

			if response == nil {
				return
			}
			bs.ReceiveMessage(ctx, p, response)
		}(peerToQuery)
	}
	return nil
}

func (bs *bitswap) run(ctx context.Context) {

Brian Tiger Chow's avatar
Brian Tiger Chow committed
159 160 161
	const rebroadcastPeriod = time.Second * 5 // Every so often, we should resend out our current want list
	const batchDelay = time.Millisecond * 3   // Time to wait before sending out wantlists to better batch up requests
	const peersPerSend = 6
162
	const threshold = 10
Brian Tiger Chow's avatar
Brian Tiger Chow committed
163 164 165

	var sendlist <-chan peer.Peer // NB: must be initialized to zero value
	broadcastSignal := time.After(rebroadcastPeriod)
Jeromy's avatar
Jeromy committed
166
	unsent := 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
167

Jeromy's avatar
Jeromy committed
168 169
	for {
		select {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
170
		case <-broadcastSignal:
Jeromy's avatar
Jeromy committed
171 172 173 174
			wantlist := bs.wantlist.Keys()
			if len(wantlist) == 0 {
				continue
			}
Jeromy's avatar
Jeromy committed
175 176
			if sendlist == nil {
				// rely on semi randomness of maps
Jeromy's avatar
Jeromy committed
177
				firstKey := wantlist[0]
Jeromy's avatar
Jeromy committed
178 179 180 181
				sendlist = bs.routing.FindProvidersAsync(ctx, firstKey, 6)
			}
			err := bs.sendWantListTo(ctx, sendlist)
			if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182
				log.Errorf("error sending wantlist: %s", err)
Jeromy's avatar
Jeromy committed
183 184
			}
			sendlist = nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
185
			broadcastSignal = time.After(rebroadcastPeriod)
Brian Tiger Chow's avatar
naming  
Brian Tiger Chow committed
186
		case k := <-bs.blockRequests:
Jeromy's avatar
Jeromy committed
187 188 189 190 191 192 193
			if unsent == 0 {
				sendlist = bs.routing.FindProvidersAsync(ctx, k, peersPerSend)
			}
			unsent++

			if unsent >= threshold {
				// send wantlist to sendlist
Brian Tiger Chow's avatar
Brian Tiger Chow committed
194 195 196 197
				err := bs.sendWantListTo(ctx, sendlist)
				if err != nil {
					log.Errorf("error sending wantlist: %s", err)
				}
Jeromy's avatar
Jeromy committed
198
				unsent = 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
199
				broadcastSignal = time.After(rebroadcastPeriod)
Jeromy's avatar
Jeromy committed
200 201 202 203
				sendlist = nil
			} else {
				// set a timeout to wait for more blocks or send current wantlist

Brian Tiger Chow's avatar
Brian Tiger Chow committed
204
				broadcastSignal = time.After(batchDelay)
Jeromy's avatar
Jeromy committed
205 206 207 208 209 210 211
			}
		case <-ctx.Done():
			return
		}
	}
}

212 213
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
214
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
215
	log.Debugf("Has Block %v", blk.Key())
216
	bs.wantlist.Remove(blk.Key())
217
	bs.sendToPeersThatWant(ctx, blk)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218
	return bs.routing.Provide(ctx, blk.Key())
219 220
}

221
// TODO(brian): handle errors
222 223
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
224 225
	log.Debugf("ReceiveMessage from %v", p.Key())
	log.Debugf("Message wantlist: %v", incoming.Wantlist())
226

227
	if p == nil {
228
		log.Error("Received message from nil peer!")
229 230
		// TODO propagate the error upward
		return nil, nil
231 232
	}
	if incoming == nil {
233
		log.Error("Got nil bitswap message!")
234 235
		// TODO propagate the error upward
		return nil, nil
236
	}
237

238 239 240
	// Record message bytes in ledger
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
241
	bs.strategy.MessageReceived(p, incoming) // FIRST
242

243
	for _, block := range incoming.Blocks() {
244
		// TODO verify blocks?
245
		if err := bs.blockstore.Put(&block); err != nil {
246
			continue // FIXME(brian): err ignored
247
		}
248 249 250 251 252
		bs.notifications.Publish(block)
		err := bs.HasBlock(ctx, block)
		if err != nil {
			log.Warningf("HasBlock errored: %s", err)
		}
253 254
	}

255 256
	message := bsmsg.New()
	for _, wanted := range bs.wantlist.Keys() {
257
		message.AddWanted(wanted)
258
	}
259
	for _, key := range incoming.Wantlist() {
260 261
		// TODO: might be better to check if we have the block before checking
		//			if we should send it to someone
262
		if bs.strategy.ShouldSendBlockToPeer(key, p) {
263 264 265
			if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
				continue
			} else {
266
				message.AddBlock(*block)
267
			}
268 269
		}
	}
270

Jeromy's avatar
Jeromy committed
271
	bs.strategy.MessageSent(p, message)
272
	log.Debug("Returning message.")
273 274 275 276
	return p, message
}

func (bs *bitswap) ReceiveError(err error) {
277
	log.Errorf("Bitswap ReceiveError: %s", err)
278 279
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
280
}
281

282 283
// send strives to ensure that accounting is always performed when a message is
// sent
284
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
285
	bs.sender.SendMessage(ctx, p, m)
286
	bs.strategy.MessageSent(p, m)
287 288
}

289
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
290
	log.Debugf("Sending %v to peers that want it", block.Key())
291

292 293
	for _, p := range bs.strategy.Peers() {
		if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
294
			log.Debugf("%v wants %v", p, block.Key())
295 296
			if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
				message := bsmsg.New()
297
				message.AddBlock(block)
298
				for _, wanted := range bs.wantlist.Keys() {
299
					message.AddWanted(wanted)
300
				}
301
				bs.send(ctx, p, message)
302 303 304 305
			}
		}
	}
}