bitswap.go 12.5 KB
Newer Older
1
// package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"math"
9
	"sync"
Jeromy's avatar
Jeromy committed
10 11
	"time"

12 13 14 15 16 17
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
Jeromy's avatar
Jeromy committed
18
	flags "github.com/ipfs/go-ipfs/flags"
19
	"github.com/ipfs/go-ipfs/thirdparty/delay"
20
	blocks "gx/ipfs/QmbJUay5h1HtzhJb5QQk2t26yCnJksHynvhcqp18utBPqG/go-block-format"
Jeromy's avatar
Jeromy committed
21

22
	cid "gx/ipfs/QmNw61A6sJoXMeP37mJRtQZdNhj5e3FdjoTN3v4FyE96Gk/go-cid"
23
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
24 25
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
26
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
27 28
	loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
40 41
	provideTimeout         = time.Second * 15
	sizeBatchRequestChan   = 32
42 43
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
44
)
45

Jeromy's avatar
Jeromy committed
46
var (
47 48 49
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
50 51 52

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53
)
Jeromy's avatar
Jeromy committed
54

Jeromy's avatar
Jeromy committed
55 56 57 58 59 60 61 62
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

63
var rebroadcastDelay = delay.Fixed(time.Minute)
64

Brian Tiger Chow's avatar
Brian Tiger Chow committed
65 66 67 68
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
69
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
70
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
71

72 73
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
74
	// coupled to the concerns of the ipfs daemon in this way.
75 76 77 78
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
79
	ctx, cancelFunc := context.WithCancel(parent)
80
	ctx = metrics.CtxSubScope(ctx, "bitswap")
81
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
82
		" data blocks recived").Histogram(metricsBuckets)
83
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
84
		" data blocks recived").Histogram(metricsBuckets)
85

86
	notif := notifications.New()
87 88 89 90 91
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

92
	bs := &Bitswap{
93
		blockstore:    bstore,
94
		notifications: notif,
95
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
96
		network:       network,
97
		findKeys:      make(chan *blockRequest, sizeBatchRequestChan),
98
		process:       px,
99 100
		newBlocks:     make(chan *cid.Cid, HasBlockBufferSize),
		provideKeys:   make(chan *cid.Cid, provideKeysBufferSize),
101
		wm:            NewWantManager(ctx, network),
102 103 104

		dupMetric: dupHist,
		allMetric: allHist,
105
	}
106
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
107
	network.SetDelegate(bs)
108

109 110
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
111 112 113 114 115 116 117 118 119

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

120 121 122
	return bs
}

123 124
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
125 126 127
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
	wm *WantManager
128

129 130
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131

132 133
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
134

135
	// blockstore is the local database
136
	// NB: ensure threadsafety
137
	blockstore blockstore.Blockstore
138

139 140
	// notifications engine for receiving new blocks and routing them to the
	// appropriate user requests
141
	notifications notifications.PubSub
142

143
	// findKeys sends keys to a worker to find and connect to providers for them
144
	findKeys chan *blockRequest
145 146 147
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
148
	newBlocks chan *cid.Cid
149
	// provideKeys directly feeds provide workers
150
	provideKeys chan *cid.Cid
151

152 153 154
	process process.Process

	// Counters for various statistics
Jeromy's avatar
Jeromy committed
155
	counterLk      sync.Mutex
156 157
	blocksRecvd    int
	dupBlocksRecvd int
158
	dupDataRecvd   uint64
Jeromy's avatar
Jeromy committed
159 160 161
	blocksSent     int
	dataSent       uint64
	dataRecvd      uint64
162 163 164 165

	// Metrics interface metrics
	dupMetric metrics.Histogram
	allMetric metrics.Histogram
166 167
}

168
type blockRequest struct {
169
	Cid *cid.Cid
170
	Ctx context.Context
171 172
}

173
// GetBlock attempts to retrieve a particular block from peers within the
174
// deadline enforced by the context.
175 176 177
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
	if k == nil {
		log.Error("nil cid in GetBlock")
jbenet's avatar
jbenet committed
178 179
		return nil, blockstore.ErrNotFound
	}
180

181 182 183 184
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
185 186
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
187
	ctx, cancelFunc := context.WithCancel(parent)
188

189 190
	// TODO: this request ID should come in from a higher layer so we can track
	// across multiple 'GetBlock' invocations
191
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
192 193
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
194
	defer cancelFunc()
195

196
	promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
197 198
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
199
	}
200 201

	select {
202 203 204 205 206 207 208 209 210
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
211
		return block, nil
212 213
	case <-parent.Done():
		return nil, parent.Err()
214 215 216
	}
}

217 218
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
	var out []*cid.Cid
219
	for _, e := range bs.engine.WantlistForPeer(p) {
220
		out = append(out, e.Cid)
221 222 223 224
	}
	return out
}

225 226 227 228
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

229 230 231 232 233 234 235
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
236
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
237
	if len(keys) == 0 {
238
		out := make(chan blocks.Block)
239 240 241 242
		close(out)
		return out, nil
	}

243 244 245 246 247
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
248
	promise := bs.notifications.Subscribe(ctx, keys...)
249

250
	for _, k := range keys {
251
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
252 253
	}

254
	bs.wm.WantBlocks(ctx, keys)
255

256 257 258
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
259
	req := &blockRequest{
260
		Cid: keys[0],
261
		Ctx: ctx,
262
	}
263

264
	remaining := cid.NewSet()
265
	for _, k := range keys {
266
		remaining.Add(k)
267 268 269 270 271 272 273 274
	}

	out := make(chan blocks.Block)
	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(out)
		defer func() {
275 276
			// can't just defer this call on its own, arguments are resolved *when* the defer is created
			bs.CancelWants(remaining.Keys())
277 278 279 280 281 282 283 284
		}()
		for {
			select {
			case blk, ok := <-promise:
				if !ok {
					return
				}

285
				remaining.Remove(blk.Cid())
286 287 288 289 290 291 292 293 294 295 296
				select {
				case out <- blk:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

297
	select {
Jeromy's avatar
Jeromy committed
298
	case bs.findKeys <- req:
299
		return out, nil
300 301 302
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
303 304
}

305
// CancelWant removes a given key from the wantlist
306 307
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
	bs.wm.CancelWants(cids)
308 309
}

310 311
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
312
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
313 314 315 316 317
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
318

319
	err := bs.blockstore.Put(blk)
320 321
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
322 323
		return err
	}
324

325 326 327 328 329
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
330 331
	bs.notifications.Publish(blk)

332 333
	bs.engine.AddBlock(blk)

334
	select {
335
	case bs.newBlocks <- blk.Cid():
336
		// send block off to be reprovided
337 338
	case <-bs.process.Closing():
		return bs.process.Close()
339 340
	}
	return nil
341 342
}

343
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
344 345
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
346
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
347 348
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
349

350 351 352
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
353 354 355 356
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
357
	var keys []*cid.Cid
358
	for _, block := range iblocks {
359
		if _, found := bs.wm.wl.Contains(block.Cid()); !found {
360
			log.Infof("received un-asked-for %s from %s", block, p)
361 362
			continue
		}
363
		keys = append(keys, block.Cid())
Jeromy's avatar
Jeromy committed
364 365
	}
	bs.wm.CancelWants(keys)
366

Jeromy's avatar
Jeromy committed
367 368 369
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
370
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
371
			defer wg.Done()
372

373
			bs.updateReceiveCounters(b)
374

375 376
			k := b.Cid()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
377

378
			log.Debugf("got block %s from %s", b, p)
379
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
380 381 382
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
383
	}
Jeromy's avatar
Jeromy committed
384
	wg.Wait()
385 386
}

387 388
var ErrAlreadyHaveBlock = errors.New("already have block")

389
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
390
	blkLen := len(b.RawData())
391
	has, err := bs.blockstore.Has(b.Cid())
392 393
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
394
		return
395
	}
396 397 398

	bs.allMetric.Observe(float64(blkLen))
	if has {
399
		bs.dupMetric.Observe(float64(blkLen))
400 401
	}

402 403 404 405
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()

	bs.blocksRecvd++
Jeromy's avatar
Jeromy committed
406
	bs.dataRecvd += uint64(len(b.RawData()))
407
	if has {
408 409
		bs.dupBlocksRecvd++
		bs.dupDataRecvd += uint64(blkLen)
410 411 412
	}
}

413
// Connected/Disconnected warns bitswap about peer connections
414
func (bs *Bitswap) PeerConnected(p peer.ID) {
415
	bs.wm.Connected(p)
416
	bs.engine.PeerConnected(p)
417 418 419
}

// Connected/Disconnected warns bitswap about peer connections
420
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
421
	bs.wm.Disconnected(p)
422
	bs.engine.PeerDisconnected(p)
423 424
}

425
func (bs *Bitswap) ReceiveError(err error) {
426
	log.Infof("Bitswap ReceiveError: %s", err)
427 428
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
429
}
430

431
func (bs *Bitswap) Close() error {
432
	return bs.process.Close()
433
}
434

435 436
func (bs *Bitswap) GetWantlist() []*cid.Cid {
	var out []*cid.Cid
437
	for _, e := range bs.wm.wl.Entries() {
438
		out = append(out, e.Cid)
439 440 441
	}
	return out
}
442 443 444 445

func (bs *Bitswap) IsOnline() bool {
	return true
}