bitswap.go 12.2 KB
Newer Older
1
// package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"math"
9
	"sync"
Jeromy's avatar
Jeromy committed
10
	"sync/atomic"
Jeromy's avatar
Jeromy committed
11 12
	"time"

13 14 15 16 17 18
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
Jeromy's avatar
Jeromy committed
19
	flags "github.com/ipfs/go-ipfs/flags"
20
	"github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
21

22
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
23 24
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
25
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Jeromy's avatar
Jeromy committed
26
	blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
27
	cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
28
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
40 41
	provideTimeout         = time.Second * 15
	sizeBatchRequestChan   = 32
42 43
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
44
)
45

Jeromy's avatar
Jeromy committed
46
var (
47 48 49
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
50 51 52

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
53
)
Jeromy's avatar
Jeromy committed
54

Jeromy's avatar
Jeromy committed
55 56 57 58 59 60 61 62
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

63
var rebroadcastDelay = delay.Fixed(time.Minute)
64

Brian Tiger Chow's avatar
Brian Tiger Chow committed
65 66 67 68
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
69
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
70
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
71

72 73
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
74
	// coupled to the concerns of the ipfs daemon in this way.
75 76 77 78
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
79
	ctx, cancelFunc := context.WithCancel(parent)
80
	ctx = metrics.CtxSubScope(ctx, "bitswap")
81
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
82
		" data blocks recived").Histogram(metricsBuckets)
83
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
84
		" data blocks recived").Histogram(metricsBuckets)
85

86
	notif := notifications.New()
87 88 89 90 91
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

92
	bs := &Bitswap{
93
		blockstore:    bstore,
94
		notifications: notif,
95
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
96
		network:       network,
97
		findKeys:      make(chan *blockRequest, sizeBatchRequestChan),
98
		process:       px,
99 100
		newBlocks:     make(chan *cid.Cid, HasBlockBufferSize),
		provideKeys:   make(chan *cid.Cid, provideKeysBufferSize),
101
		wm:            NewWantManager(ctx, network),
102 103 104

		dupMetric: dupHist,
		allMetric: allHist,
105
	}
106
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
107
	network.SetDelegate(bs)
108

109 110
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
111 112 113 114 115 116 117 118 119

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

120 121 122
	return bs
}

123 124
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
125 126 127
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
	wm *WantManager
128

129 130
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131

132 133
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
134 135 136 137 138

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

139 140
	// notifications engine for receiving new blocks and routing them to the
	// appropriate user requests
141 142
	notifications notifications.PubSub

143
	// findKeys sends keys to a worker to find and connect to providers for them
144
	findKeys chan *blockRequest
145 146 147
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
148
	newBlocks chan *cid.Cid
149
	// provideKeys directly feeds provide workers
150
	provideKeys chan *cid.Cid
151

152 153 154
	process process.Process

	// Counters for various statistics
Jeromy's avatar
Jeromy committed
155
	counterLk      sync.Mutex
156 157
	blocksRecvd    int
	dupBlocksRecvd int
158
	dupDataRecvd   uint64
Jeromy's avatar
Jeromy committed
159 160 161
	blocksSent     int
	dataSent       uint64
	dataRecvd      uint64
Jeromy's avatar
Jeromy committed
162
	messagesRecvd  uint64
163 164 165 166

	// Metrics interface metrics
	dupMetric metrics.Histogram
	allMetric metrics.Histogram
Jeromy's avatar
Jeromy committed
167 168 169 170

	// Sessions
	sessions []*Session
	sessLk   sync.Mutex
Jeromy's avatar
Jeromy committed
171 172 173

	sessID   uint64
	sessIDLk sync.Mutex
174 175
}

176
type blockRequest struct {
177
	Cid *cid.Cid
178
	Ctx context.Context
179 180
}

181
// GetBlock attempts to retrieve a particular block from peers within the
182
// deadline enforced by the context.
183
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
184
	return getBlock(parent, k, bs.GetBlocks)
185 186
}

187 188
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
	var out []*cid.Cid
189
	for _, e := range bs.engine.WantlistForPeer(p) {
190
		out = append(out, e.Cid)
191 192 193 194
	}
	return out
}

195 196 197 198
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

199 200 201 202 203 204 205
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
206
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
207
	if len(keys) == 0 {
208
		out := make(chan blocks.Block)
209 210 211 212
		close(out)
		return out, nil
	}

213 214 215 216 217
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
218
	promise := bs.notifications.Subscribe(ctx, keys...)
219

220
	for _, k := range keys {
221
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
222 223
	}

Jeromy's avatar
Jeromy committed
224 225 226
	mses := bs.getNextSessionID()

	bs.wm.WantBlocks(ctx, keys, nil, mses)
227

228 229 230
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
231
	req := &blockRequest{
232
		Cid: keys[0],
233
		Ctx: ctx,
234
	}
235

236
	remaining := cid.NewSet()
237
	for _, k := range keys {
238
		remaining.Add(k)
239 240 241 242 243 244 245 246
	}

	out := make(chan blocks.Block)
	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(out)
		defer func() {
247
			// can't just defer this call on its own, arguments are resolved *when* the defer is created
Jeromy's avatar
Jeromy committed
248
			bs.CancelWants(remaining.Keys(), mses)
249 250 251 252 253 254 255 256
		}()
		for {
			select {
			case blk, ok := <-promise:
				if !ok {
					return
				}

Jeromy's avatar
Jeromy committed
257
				bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
258
				remaining.Remove(blk.Cid())
259 260 261 262 263 264 265 266 267 268 269
				select {
				case out <- blk:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

270
	select {
Jeromy's avatar
Jeromy committed
271
	case bs.findKeys <- req:
272
		return out, nil
273 274 275
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
276 277
}

Jeromy's avatar
Jeromy committed
278 279 280 281 282 283 284
func (bs *Bitswap) getNextSessionID() uint64 {
	bs.sessIDLk.Lock()
	defer bs.sessIDLk.Unlock()
	bs.sessID++
	return bs.sessID
}

285
// CancelWant removes a given key from the wantlist
Jeromy's avatar
Jeromy committed
286
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
287 288 289
	if len(cids) == 0 {
		return
	}
Jeromy's avatar
Jeromy committed
290
	bs.wm.CancelWants(context.Background(), cids, nil, ses)
291 292
}

293 294
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
295
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
296 297 298 299 300
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
301

302
	err := bs.blockstore.Put(blk)
303 304
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
305 306
		return err
	}
307

308 309 310 311 312
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
313 314
	bs.notifications.Publish(blk)

315 316
	bs.engine.AddBlock(blk)

317
	select {
318
	case bs.newBlocks <- blk.Cid():
319
		// send block off to be reprovided
320 321
	case <-bs.process.Closing():
		return bs.process.Close()
322 323
	}
	return nil
324 325
}

Jeromy's avatar
Jeromy committed
326
// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
Jeromy's avatar
Jeromy committed
327 328 329 330 331 332
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
	bs.sessLk.Lock()
	defer bs.sessLk.Unlock()

	var out []*Session
	for _, s := range bs.sessions {
Jeromy's avatar
Jeromy committed
333
		if s.interestedIn(c) {
Jeromy's avatar
Jeromy committed
334 335 336 337 338 339
			out = append(out, s)
		}
	}
	return out
}

340
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
341 342
	atomic.AddUint64(&bs.messagesRecvd, 1)

Jeromy's avatar
Jeromy committed
343 344
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
345
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
346 347
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
348

349 350 351
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
352 353 354 355
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
356
	var keys []*cid.Cid
357
	for _, block := range iblocks {
358
		if _, found := bs.wm.wl.Contains(block.Cid()); !found {
359
			log.Infof("received un-asked-for %s from %s", block, p)
360 361
			continue
		}
362
		keys = append(keys, block.Cid())
Jeromy's avatar
Jeromy committed
363
	}
Jeromy's avatar
Jeromy committed
364

Jeromy's avatar
Jeromy committed
365 366 367
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
368
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
369
			defer wg.Done()
370

371
			bs.updateReceiveCounters(b)
372

373 374
			k := b.Cid()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
375

Jeromy's avatar
Jeromy committed
376
			for _, ses := range bs.SessionsForBlock(k) {
Jeromy's avatar
Jeromy committed
377 378
				ses.receiveBlockFrom(p, b)
				bs.CancelWants([]*cid.Cid{k}, ses.id)
Jeromy's avatar
Jeromy committed
379
			}
380
			log.Debugf("got block %s from %s", b, p)
381
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
382 383 384
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
385
	}
Jeromy's avatar
Jeromy committed
386
	wg.Wait()
387 388
}

389 390
var ErrAlreadyHaveBlock = errors.New("already have block")

391
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
392
	blkLen := len(b.RawData())
393
	has, err := bs.blockstore.Has(b.Cid())
394 395
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
396
		return
397
	}
398 399 400

	bs.allMetric.Observe(float64(blkLen))
	if has {
401
		bs.dupMetric.Observe(float64(blkLen))
402 403
	}

404 405 406 407
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()

	bs.blocksRecvd++
Jeromy's avatar
Jeromy committed
408
	bs.dataRecvd += uint64(len(b.RawData()))
409
	if has {
410 411
		bs.dupBlocksRecvd++
		bs.dupDataRecvd += uint64(blkLen)
412 413 414
	}
}

415
// Connected/Disconnected warns bitswap about peer connections
416
func (bs *Bitswap) PeerConnected(p peer.ID) {
417
	bs.wm.Connected(p)
418
	bs.engine.PeerConnected(p)
419 420 421
}

// Connected/Disconnected warns bitswap about peer connections
422
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
423
	bs.wm.Disconnected(p)
424
	bs.engine.PeerDisconnected(p)
425 426
}

427
func (bs *Bitswap) ReceiveError(err error) {
428
	log.Infof("Bitswap ReceiveError: %s", err)
429 430
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
431 432
}

433
func (bs *Bitswap) Close() error {
434
	return bs.process.Close()
435
}
436

437 438
func (bs *Bitswap) GetWantlist() []*cid.Cid {
	var out []*cid.Cid
439
	for _, e := range bs.wm.wl.Entries() {
440
		out = append(out, e.Cid)
441 442 443
	}
	return out
}
444 445 446 447

func (bs *Bitswap) IsOnline() bool {
	return true
}