bitswap.go 12.5 KB
Newer Older
1
// package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"math"
9
	"sync"
Jeromy's avatar
Jeromy committed
10 11
	"time"

12 13 14 15 16 17 18
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
Jeromy's avatar
Jeromy committed
19
	flags "github.com/ipfs/go-ipfs/flags"
20
	"github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
21

22
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
23 24
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
25
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
26 27 28
	loggables "gx/ipfs/QmTcfnDHimxBJqx6utpnWqVHdvyquXgkwAvYt4zMaJMKS2/go-libp2p-loggables"
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
	peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
41
	provideTimeout         = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
51 52 53

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54
)
Jeromy's avatar
Jeromy committed
55

Jeromy's avatar
Jeromy committed
56 57 58 59 60 61 62 63
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

64
var rebroadcastDelay = delay.Fixed(time.Minute)
65

Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67 68 69
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
70
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
71
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
72

73 74
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
75
	// coupled to the concerns of the ipfs daemon in this way.
76 77 78 79
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
80
	ctx, cancelFunc := context.WithCancel(parent)
81
	ctx = metrics.CtxSubScope(ctx, "bitswap")
82
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
83
		" data blocks recived").Histogram(metricsBuckets)
84
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
85
		" data blocks recived").Histogram(metricsBuckets)
86

87
	notif := notifications.New()
88 89 90 91 92
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

93
	bs := &Bitswap{
94
		blockstore:    bstore,
95
		notifications: notif,
96
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
97
		network:       network,
98
		findKeys:      make(chan *blockRequest, sizeBatchRequestChan),
99
		process:       px,
100 101
		newBlocks:     make(chan *cid.Cid, HasBlockBufferSize),
		provideKeys:   make(chan *cid.Cid, provideKeysBufferSize),
102
		wm:            NewWantManager(ctx, network),
103 104 105

		dupMetric: dupHist,
		allMetric: allHist,
106
	}
107
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
108
	network.SetDelegate(bs)
109

110 111
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
112 113 114 115 116 117 118 119 120

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

121 122 123
	return bs
}

124 125
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
126 127 128
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
	wm *WantManager
129

130 131
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132

133 134
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
135 136 137 138 139

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

140 141
	// notifications engine for receiving new blocks and routing them to the
	// appropriate user requests
142 143
	notifications notifications.PubSub

144
	// findKeys sends keys to a worker to find and connect to providers for them
145
	findKeys chan *blockRequest
146 147 148
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
149
	newBlocks chan *cid.Cid
150
	// provideKeys directly feeds provide workers
151
	provideKeys chan *cid.Cid
152

153 154 155
	process process.Process

	// Counters for various statistics
Jeromy's avatar
Jeromy committed
156
	counterLk      sync.Mutex
157 158
	blocksRecvd    int
	dupBlocksRecvd int
159
	dupDataRecvd   uint64
Jeromy's avatar
Jeromy committed
160 161 162
	blocksSent     int
	dataSent       uint64
	dataRecvd      uint64
163 164 165 166

	// Metrics interface metrics
	dupMetric metrics.Histogram
	allMetric metrics.Histogram
167 168
}

169
type blockRequest struct {
170
	Cid *cid.Cid
171
	Ctx context.Context
172 173
}

174
// GetBlock attempts to retrieve a particular block from peers within the
175
// deadline enforced by the context.
176 177 178
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
	if k == nil {
		log.Error("nil cid in GetBlock")
jbenet's avatar
jbenet committed
179 180
		return nil, blockstore.ErrNotFound
	}
181

182 183 184 185
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
186 187
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
188
	ctx, cancelFunc := context.WithCancel(parent)
189

190 191
	// TODO: this request ID should come in from a higher layer so we can track
	// across multiple 'GetBlock' invocations
192
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
193 194
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
195
	defer cancelFunc()
196

197
	promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
198 199
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
200
	}
201 202

	select {
203 204 205 206 207 208 209 210 211
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
212
		return block, nil
213 214
	case <-parent.Done():
		return nil, parent.Err()
215 216 217
	}
}

218 219
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
	var out []*cid.Cid
220
	for _, e := range bs.engine.WantlistForPeer(p) {
221
		out = append(out, e.Cid)
222 223 224 225
	}
	return out
}

226 227 228 229
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

230 231 232 233 234 235 236
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
237
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
238
	if len(keys) == 0 {
239
		out := make(chan blocks.Block)
240 241 242 243
		close(out)
		return out, nil
	}

244 245 246 247 248
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
249
	promise := bs.notifications.Subscribe(ctx, keys...)
250

251
	for _, k := range keys {
252
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
253 254
	}

255
	bs.wm.WantBlocks(ctx, keys)
256

257 258 259
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
260
	req := &blockRequest{
261
		Cid: keys[0],
262
		Ctx: ctx,
263
	}
264

265
	remaining := cid.NewSet()
266
	for _, k := range keys {
267
		remaining.Add(k)
268 269 270 271 272 273 274 275
	}

	out := make(chan blocks.Block)
	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(out)
		defer func() {
276 277
			// can't just defer this call on its own, arguments are resolved *when* the defer is created
			bs.CancelWants(remaining.Keys())
278 279 280 281 282 283 284 285
		}()
		for {
			select {
			case blk, ok := <-promise:
				if !ok {
					return
				}

286
				remaining.Remove(blk.Cid())
287 288 289 290 291 292 293 294 295 296 297
				select {
				case out <- blk:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

298
	select {
Jeromy's avatar
Jeromy committed
299
	case bs.findKeys <- req:
300
		return out, nil
301 302 303
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
304 305
}

306
// CancelWant removes a given key from the wantlist
307 308
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
	bs.wm.CancelWants(cids)
309 310
}

311 312
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
313
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
314 315 316 317 318
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
319

320
	err := bs.blockstore.Put(blk)
321 322
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
323 324
		return err
	}
325

326 327 328 329 330
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
331 332
	bs.notifications.Publish(blk)

333 334
	bs.engine.AddBlock(blk)

335
	select {
336
	case bs.newBlocks <- blk.Cid():
337
		// send block off to be reprovided
338 339
	case <-bs.process.Closing():
		return bs.process.Close()
340 341
	}
	return nil
342 343
}

344
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
345 346
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
347
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
348 349
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
350

351 352 353
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
354 355 356 357
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
358
	var keys []*cid.Cid
359
	for _, block := range iblocks {
360
		if _, found := bs.wm.wl.Contains(block.Cid()); !found {
361
			log.Infof("received un-asked-for %s from %s", block, p)
362 363
			continue
		}
364
		keys = append(keys, block.Cid())
Jeromy's avatar
Jeromy committed
365 366
	}
	bs.wm.CancelWants(keys)
367

Jeromy's avatar
Jeromy committed
368 369 370
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
371
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
372
			defer wg.Done()
373

374
			bs.updateReceiveCounters(b)
375

376 377
			k := b.Cid()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
378

379
			log.Debugf("got block %s from %s", b, p)
380
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
381 382 383
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
384
	}
Jeromy's avatar
Jeromy committed
385
	wg.Wait()
386 387
}

388 389
var ErrAlreadyHaveBlock = errors.New("already have block")

390
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
391
	blkLen := len(b.RawData())
392
	has, err := bs.blockstore.Has(b.Cid())
393 394
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
395
		return
396
	}
397 398 399

	bs.allMetric.Observe(float64(blkLen))
	if has {
400
		bs.dupMetric.Observe(float64(blkLen))
401 402
	}

403 404 405 406
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()

	bs.blocksRecvd++
Jeromy's avatar
Jeromy committed
407
	bs.dataRecvd += uint64(len(b.RawData()))
408
	if has {
409 410
		bs.dupBlocksRecvd++
		bs.dupDataRecvd += uint64(blkLen)
411 412 413
	}
}

414
// Connected/Disconnected warns bitswap about peer connections
415
func (bs *Bitswap) PeerConnected(p peer.ID) {
416
	bs.wm.Connected(p)
417 418 419
}

// Connected/Disconnected warns bitswap about peer connections
420
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
421
	bs.wm.Disconnected(p)
422
	bs.engine.PeerDisconnected(p)
423 424
}

425
func (bs *Bitswap) ReceiveError(err error) {
426
	log.Infof("Bitswap ReceiveError: %s", err)
427 428
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
429 430
}

431
func (bs *Bitswap) Close() error {
432
	return bs.process.Close()
433
}
434

435 436
func (bs *Bitswap) GetWantlist() []*cid.Cid {
	var out []*cid.Cid
437
	for _, e := range bs.wm.wl.Entries() {
438
		out = append(out, e.Cid)
439 440 441
	}
	return out
}
442 443 444 445

func (bs *Bitswap) IsOnline() bool {
	return true
}