bitswap.go 12.9 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"math"
7
	"sync"
Jeromy's avatar
Jeromy committed
8 9
	"time"

10
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
11
	inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
12
	process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
13

14
	blocks "github.com/jbenet/go-ipfs/blocks"
15
	blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
16
	exchange "github.com/jbenet/go-ipfs/exchange"
17
	decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision"
18 19 20
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
21
	wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
	peer "github.com/jbenet/go-ipfs/p2p/peer"
23 24
	"github.com/jbenet/go-ipfs/thirdparty/delay"
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
25
	u "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
26 27
	errors "github.com/jbenet/go-ipfs/util/debugerror"
	pset "github.com/jbenet/go-ipfs/util/peerset" // TODO move this to peerstore
28 29
)

30
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
36 37 38 39
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	sizeBatchRequestChan   = 32
41 42
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43
)
Jeromy's avatar
Jeromy committed
44

Brian Tiger Chow's avatar
Brian Tiger Chow committed
45
var (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
	rebroadcastDelay = delay.Fixed(time.Second * 10)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
)
48

Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
53
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
54
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
55

56 57 58 59 60 61 62
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
63 64
	ctx, cancelFunc := context.WithCancel(parent)

65
	notif := notifications.New()
66 67 68 69 70
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

71
	go func() {
72
		<-px.Closing() // process closes first
Jeromy's avatar
Jeromy committed
73
		cancelFunc()
74 75 76 77
	}()
	go func() {
		<-ctx.Done() // parent cancelled first
		px.Close()
78 79
	}()

80
	bs := &bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		self:          p,
82
		blockstore:    bstore,
83
		notifications: notif,
84
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
85
		network:       network,
86
		wantlist:      wantlist.NewThreadSafe(),
87
		batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
88
		process:       px,
89
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90
	network.SetDelegate(bs)
91 92 93 94 95 96
	px.Go(func(px process.Process) {
		bs.clientWorker(ctx)
	})
	px.Go(func(px process.Process) {
		bs.taskWorker(ctx)
	})
97 98 99
	px.Go(func(px process.Process) {
		bs.rebroadcastWorker(ctx)
	})
100 101 102 103

	return bs
}

104 105 106
// bitswap instances implement the bitswap protocol.
type bitswap struct {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108 109
	// the ID of the peer to act on behalf of
	self peer.ID

110 111
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
112 113 114 115 116 117 118

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

119 120 121
	// Requests for a set of related blocks
	// the assumption is made that the same peer is likely to
	// have more than a single block in the set
122
	batchRequests chan *blockRequest
Jeromy's avatar
Jeromy committed
123

124
	engine *decision.Engine
125

126
	wantlist *wantlist.ThreadSafe
127

128
	process process.Process
129 130
}

131 132 133 134 135
type blockRequest struct {
	keys []u.Key
	ctx  context.Context
}

136
// GetBlock attempts to retrieve a particular block from peers within the
137
// deadline enforced by the context.
Jeromy's avatar
Jeromy committed
138
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
139

140 141 142 143
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
144 145
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
146

147
	ctx, cancelFunc := context.WithCancel(parent)
148

Jeromy's avatar
Jeromy committed
149
	ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
Jeromy's avatar
Jeromy committed
150
	defer log.EventBegin(ctx, "GetBlockRequest", &k).Done()
151 152 153 154

	defer func() {
		cancelFunc()
	}()
155

156
	promise, err := bs.GetBlocks(ctx, []u.Key{k})
157 158
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
159
	}
160 161

	select {
162 163 164 165 166 167 168 169 170
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
171
		return block, nil
172 173
	case <-parent.Done():
		return nil, parent.Err()
174
	}
175

176 177
}

178 179 180 181 182 183 184
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
Jeromy's avatar
Jeromy committed
185
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
186 187 188 189 190
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
191
	promise := bs.notifications.Subscribe(ctx, keys...)
192 193 194 195 196

	req := &blockRequest{
		keys: keys,
		ctx:  ctx,
	}
197
	select {
198
	case bs.batchRequests <- req:
199
		return promise, nil
200 201 202
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
203 204
}

205 206 207
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
208 209 210 211 212
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
213 214 215 216 217
	if err := bs.blockstore.Put(blk); err != nil {
		return err
	}
	bs.wantlist.Remove(blk.Key())
	bs.notifications.Publish(blk)
218
	return bs.network.Provide(ctx, blk.Key())
219 220
}

221 222
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
	set := pset.New()
223
	wg := sync.WaitGroup{}
224
	for peerToQuery := range peers {
225 226 227 228 229

		if !set.TryAdd(peerToQuery) { //Do once per peer
			continue
		}

230
		wg.Add(1)
231
		go func(p peer.ID) {
232
			defer wg.Done()
233
			if err := bs.send(ctx, p, m); err != nil {
234
				log.Debug(err) // TODO remove if too verbose
235
			}
236
		}(peerToQuery)
Jeromy's avatar
Jeromy committed
237
	}
238
	wg.Wait()
Jeromy's avatar
Jeromy committed
239 240 241
	return nil
}

242
func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
Jeromy's avatar
Jeromy committed
243 244
	message := bsmsg.New()
	message.SetFull(true)
245 246
	for _, wanted := range bs.wantlist.Entries() {
		message.AddEntry(wanted.Key, wanted.Priority)
Jeromy's avatar
Jeromy committed
247
	}
248 249
	return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
Jeromy's avatar
Jeromy committed
250

Brian Tiger Chow's avatar
Brian Tiger Chow committed
251
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
Jeromy's avatar
Jeromy committed
252

253 254 255 256 257
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// prepare a channel to hand off to sendWantlistToPeers
	sendToPeers := make(chan peer.ID)
Jeromy's avatar
Jeromy committed
258

259
	// Get providers for all entries in wantlist (could take a while)
Jeromy's avatar
Jeromy committed
260
	wg := sync.WaitGroup{}
Jeromy's avatar
Jeromy committed
261
	for _, e := range entries {
262
		wg.Add(1)
Jeromy's avatar
Jeromy committed
263
		go func(k u.Key) {
Jeromy's avatar
Jeromy committed
264
			defer wg.Done()
265

266
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
267
			providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
268
			for prov := range providers {
269
				sendToPeers <- prov
Jeromy's avatar
Jeromy committed
270
			}
271
		}(e.Key)
Jeromy's avatar
Jeromy committed
272
	}
273 274 275 276 277 278 279 280

	go func() {
		wg.Wait() // make sure all our children do finish.
		close(sendToPeers)
	}()

	err := bs.sendWantlistToPeers(ctx, sendToPeers)
	if err != nil {
281
		log.Debugf("sendWantlistToPeers error: %s", err)
282
	}
Jeromy's avatar
Jeromy committed
283 284
}

285
// TODO(brian): handle errors
286 287
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
	peer.ID, bsmsg.BitSwapMessage) {
288
	defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
289

290
	if p == "" {
291
		log.Debug("Received message from nil peer!")
292
		// TODO propagate the error upward
293
		return "", nil
294 295
	}
	if incoming == nil {
296
		log.Debug("Got nil bitswap message!")
297
		// TODO propagate the error upward
298
		return "", nil
299
	}
300

Jeromy's avatar
Jeromy committed
301 302
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
303
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
304 305
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
306

Brian Tiger Chow's avatar
Brian Tiger Chow committed
307
	for _, block := range incoming.Blocks() {
308 309
		hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout)
		if err := bs.HasBlock(hasBlockCtx, block); err != nil {
310
			log.Debug(err)
Jeromy's avatar
Jeromy committed
311
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
312
	}
313
	var keys []u.Key
Brian Tiger Chow's avatar
Brian Tiger Chow committed
314
	for _, block := range incoming.Blocks() {
315
		keys = append(keys, block.Key())
316
	}
317
	bs.cancelBlocks(ctx, keys)
318

Jeromy's avatar
Jeromy committed
319
	// TODO: consider changing this function to not return anything
320
	return "", nil
321 322
}

323 324 325
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerConnected(p peer.ID) {
	// TODO: add to clientWorker??
326 327 328 329 330
	peers := make(chan peer.ID, 1)
	peers <- p
	close(peers)
	err := bs.sendWantlistToPeers(context.TODO(), peers)
	if err != nil {
331
		log.Debugf("error sending wantlist: %s", err)
332
	}
333 334 335
}

// Connected/Disconnected warns bitswap about peer connections
336 337
func (bs *bitswap) PeerDisconnected(p peer.ID) {
	bs.engine.PeerDisconnected(p)
338 339
}

Jeromy's avatar
Jeromy committed
340
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
341 342 343
	if len(bkeys) < 1 {
		return
	}
Jeromy's avatar
Jeromy committed
344 345 346
	message := bsmsg.New()
	message.SetFull(false)
	for _, k := range bkeys {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
347
		message.Cancel(k)
Jeromy's avatar
Jeromy committed
348
	}
349
	for _, p := range bs.engine.Peers() {
Jeromy's avatar
Jeromy committed
350 351
		err := bs.send(ctx, p, message)
		if err != nil {
352
			log.Debugf("Error sending message: %s", err)
Jeromy's avatar
Jeromy committed
353 354 355 356
		}
	}
}

357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
	if len(bkeys) < 1 {
		return
	}

	message := bsmsg.New()
	message.SetFull(false)
	for i, k := range bkeys {
		message.AddEntry(k, kMaxPriority-i)
	}
	for _, p := range bs.engine.Peers() {
		err := bs.send(ctx, p, message)
		if err != nil {
			log.Debugf("Error sending message: %s", err)
		}
	}
}

375
func (bs *bitswap) ReceiveError(err error) {
376
	log.Debugf("Bitswap ReceiveError: %s", err)
377 378
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
379 380
}

381 382
// send strives to ensure that accounting is always performed when a message is
// sent
383
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
384
	defer log.EventBegin(ctx, "sendMessage", p, m).Done()
385
	if err := bs.network.SendMessage(ctx, p, m); err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
386
		return errors.Wrap(err)
387
	}
388
	return bs.engine.MessageSent(p, m)
389
}
390 391

func (bs *bitswap) Close() error {
392
	return bs.process.Close()
393
}
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419

func (bs *bitswap) taskWorker(ctx context.Context) {
	defer log.Info("bitswap task worker shutting down...")
	for {
		select {
		case <-ctx.Done():
			return
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case <-ctx.Done():
				return
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
				log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
				bs.send(ctx, envelope.Peer, envelope.Message)
			}
		}
	}
}

// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
	defer log.Info("bitswap client worker shutting down...")

420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
	for {
		select {
		case req := <-bs.batchRequests:
			keys := req.keys
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
			for i, k := range keys {
				bs.wantlist.Add(k, kMaxPriority-i)
			}

			bs.wantNewBlocks(req.ctx, keys)

			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
			child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
			err := bs.sendWantlistToPeers(req.ctx, providers)
			if err != nil {
				log.Debugf("error sending wantlist: %s", err)
			}
		case <-parent.Done():
			return
		}
	}
}

func (bs *bitswap) rebroadcastWorker(parent context.Context) {
450
	ctx, cancel := context.WithCancel(parent)
451
	defer cancel()
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472

	broadcastSignal := time.After(rebroadcastDelay.Get())

	for {
		select {
		case <-time.Tick(10 * time.Second):
			n := bs.wantlist.Len()
			if n > 0 {
				log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
			}
		case <-broadcastSignal: // resend unfulfilled wantlist keys
			entries := bs.wantlist.Entries()
			if len(entries) > 0 {
				bs.sendWantlistToProviders(ctx, entries)
			}
			broadcastSignal = time.After(rebroadcastDelay.Get())
		case <-parent.Done():
			return
		}
	}
}