bitswap.go 12.2 KB
Newer Older
1
// package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"math"
9
	"sync"
Jeromy's avatar
Jeromy committed
10
	"sync/atomic"
Jeromy's avatar
Jeromy committed
11 12
	"time"

13 14 15 16 17 18
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
Jeromy's avatar
Jeromy committed
19
	flags "github.com/ipfs/go-ipfs/flags"
20
	"github.com/ipfs/go-ipfs/thirdparty/delay"
Jeromy's avatar
Jeromy committed
21

22
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
23 24
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
25
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
26
	loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
27
	blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format"
28
	cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
29
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
30 31
)

Jeromy's avatar
Jeromy committed
32
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33

Brian Tiger Chow's avatar
Brian Tiger Chow committed
34
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
35 36 37
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
41 42
	provideTimeout         = time.Second * 15
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
51 52 53

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54
)
Jeromy's avatar
Jeromy committed
55

Jeromy's avatar
Jeromy committed
56 57 58 59 60 61 62 63
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

64
var rebroadcastDelay = delay.Fixed(time.Minute)
65

Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67 68 69
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
70
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
71
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
72

73 74
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
75
	// coupled to the concerns of the ipfs daemon in this way.
76 77 78 79
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
80
	ctx, cancelFunc := context.WithCancel(parent)
81
	ctx = metrics.CtxSubScope(ctx, "bitswap")
82
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
83
		" data blocks recived").Histogram(metricsBuckets)
84
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
85
		" data blocks recived").Histogram(metricsBuckets)
86

87
	notif := notifications.New()
88 89 90 91 92
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

93
	bs := &Bitswap{
94
		blockstore:    bstore,
95
		notifications: notif,
96
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
97
		network:       network,
98
		findKeys:      make(chan *blockRequest, sizeBatchRequestChan),
99
		process:       px,
100 101
		newBlocks:     make(chan *cid.Cid, HasBlockBufferSize),
		provideKeys:   make(chan *cid.Cid, provideKeysBufferSize),
102
		wm:            NewWantManager(ctx, network),
103 104 105

		dupMetric: dupHist,
		allMetric: allHist,
106
	}
107
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
108
	network.SetDelegate(bs)
109

110 111
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
112 113 114 115 116 117 118 119 120

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

121 122 123
	return bs
}

124 125
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
126 127 128
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
	wm *WantManager
129

130 131
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132

133 134
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
135 136 137 138 139

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

140 141
	// notifications engine for receiving new blocks and routing them to the
	// appropriate user requests
142 143
	notifications notifications.PubSub

144
	// findKeys sends keys to a worker to find and connect to providers for them
145
	findKeys chan *blockRequest
146 147 148
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
149
	newBlocks chan *cid.Cid
150
	// provideKeys directly feeds provide workers
151
	provideKeys chan *cid.Cid
152

153 154 155
	process process.Process

	// Counters for various statistics
Jeromy's avatar
Jeromy committed
156
	counterLk      sync.Mutex
157 158
	blocksRecvd    int
	dupBlocksRecvd int
159
	dupDataRecvd   uint64
Jeromy's avatar
Jeromy committed
160 161 162
	blocksSent     int
	dataSent       uint64
	dataRecvd      uint64
Jeromy's avatar
Jeromy committed
163
	messagesRecvd  uint64
164 165 166 167

	// Metrics interface metrics
	dupMetric metrics.Histogram
	allMetric metrics.Histogram
Jeromy's avatar
Jeromy committed
168 169 170 171

	// Sessions
	sessions []*Session
	sessLk   sync.Mutex
Jeromy's avatar
Jeromy committed
172 173 174

	sessID   uint64
	sessIDLk sync.Mutex
175 176
}

177
type blockRequest struct {
178
	Cid *cid.Cid
179
	Ctx context.Context
180 181
}

182
// GetBlock attempts to retrieve a particular block from peers within the
183
// deadline enforced by the context.
184
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
185
	return getBlock(parent, k, bs.GetBlocks)
186 187
}

188 189
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid {
	var out []*cid.Cid
190
	for _, e := range bs.engine.WantlistForPeer(p) {
191
		out = append(out, e.Cid)
192 193 194 195
	}
	return out
}

196 197 198 199
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

200 201 202 203 204 205 206
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
207
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
208
	if len(keys) == 0 {
209
		out := make(chan blocks.Block)
210 211 212 213
		close(out)
		return out, nil
	}

214 215 216 217 218
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
219
	promise := bs.notifications.Subscribe(ctx, keys...)
220

221
	for _, k := range keys {
222
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
223 224
	}

Jeromy's avatar
Jeromy committed
225 226 227
	mses := bs.getNextSessionID()

	bs.wm.WantBlocks(ctx, keys, nil, mses)
228

229 230 231
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
232
	req := &blockRequest{
233
		Cid: keys[0],
234
		Ctx: ctx,
235
	}
236

237
	remaining := cid.NewSet()
238
	for _, k := range keys {
239
		remaining.Add(k)
240 241 242 243 244 245 246 247
	}

	out := make(chan blocks.Block)
	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(out)
		defer func() {
248
			// can't just defer this call on its own, arguments are resolved *when* the defer is created
Jeromy's avatar
Jeromy committed
249
			bs.CancelWants(remaining.Keys(), mses)
250 251 252 253 254 255 256 257
		}()
		for {
			select {
			case blk, ok := <-promise:
				if !ok {
					return
				}

Jeromy's avatar
Jeromy committed
258
				bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
259
				remaining.Remove(blk.Cid())
260 261 262 263 264 265 266 267 268 269 270
				select {
				case out <- blk:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

271
	select {
Jeromy's avatar
Jeromy committed
272
	case bs.findKeys <- req:
273
		return out, nil
274 275 276
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
277 278
}

Jeromy's avatar
Jeromy committed
279 280 281 282 283 284 285
func (bs *Bitswap) getNextSessionID() uint64 {
	bs.sessIDLk.Lock()
	defer bs.sessIDLk.Unlock()
	bs.sessID++
	return bs.sessID
}

286
// CancelWant removes a given key from the wantlist
Jeromy's avatar
Jeromy committed
287
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
288 289 290
	if len(cids) == 0 {
		return
	}
Jeromy's avatar
Jeromy committed
291
	bs.wm.CancelWants(context.Background(), cids, nil, ses)
292 293
}

294 295
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
296
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
297 298 299 300 301
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
302

303
	err := bs.blockstore.Put(blk)
304 305
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
306 307
		return err
	}
308

309 310 311 312 313
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
314 315
	bs.notifications.Publish(blk)

316 317
	bs.engine.AddBlock(blk)

318
	select {
319
	case bs.newBlocks <- blk.Cid():
320
		// send block off to be reprovided
321 322
	case <-bs.process.Closing():
		return bs.process.Close()
323 324
	}
	return nil
325 326
}

Jeromy's avatar
Jeromy committed
327 328 329 330 331 332
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
	bs.sessLk.Lock()
	defer bs.sessLk.Unlock()

	var out []*Session
	for _, s := range bs.sessions {
Jeromy's avatar
Jeromy committed
333
		if s.interestedIn(c) {
Jeromy's avatar
Jeromy committed
334 335 336 337 338 339
			out = append(out, s)
		}
	}
	return out
}

340
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
341 342
	atomic.AddUint64(&bs.messagesRecvd, 1)

Jeromy's avatar
Jeromy committed
343 344
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
345
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
346 347
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
348

349 350 351
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
352 353 354 355
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
356
	var keys []*cid.Cid
357
	for _, block := range iblocks {
358
		if _, found := bs.wm.wl.Contains(block.Cid()); !found {
359
			log.Infof("received un-asked-for %s from %s", block, p)
360 361
			continue
		}
362
		keys = append(keys, block.Cid())
Jeromy's avatar
Jeromy committed
363
	}
Jeromy's avatar
Jeromy committed
364

Jeromy's avatar
Jeromy committed
365 366 367
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
368
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
369
			defer wg.Done()
370

371
			bs.updateReceiveCounters(b)
372

373 374
			k := b.Cid()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
375

Jeromy's avatar
Jeromy committed
376
			for _, ses := range bs.SessionsForBlock(k) {
Jeromy's avatar
Jeromy committed
377 378
				ses.receiveBlockFrom(p, b)
				bs.CancelWants([]*cid.Cid{k}, ses.id)
Jeromy's avatar
Jeromy committed
379
			}
380
			log.Debugf("got block %s from %s", b, p)
381
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
382 383 384
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
385
	}
Jeromy's avatar
Jeromy committed
386
	wg.Wait()
387 388
}

389 390
var ErrAlreadyHaveBlock = errors.New("already have block")

391
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
392
	blkLen := len(b.RawData())
393
	has, err := bs.blockstore.Has(b.Cid())
394 395
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
396
		return
397
	}
398 399 400

	bs.allMetric.Observe(float64(blkLen))
	if has {
401
		bs.dupMetric.Observe(float64(blkLen))
402 403
	}

404 405 406 407
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()

	bs.blocksRecvd++
Jeromy's avatar
Jeromy committed
408
	bs.dataRecvd += uint64(len(b.RawData()))
409
	if has {
410 411
		bs.dupBlocksRecvd++
		bs.dupDataRecvd += uint64(blkLen)
412 413 414
	}
}

415
// Connected/Disconnected warns bitswap about peer connections
416
func (bs *Bitswap) PeerConnected(p peer.ID) {
417
	bs.wm.Connected(p)
418
	bs.engine.PeerConnected(p)
419 420 421
}

// Connected/Disconnected warns bitswap about peer connections
422
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
423
	bs.wm.Disconnected(p)
424
	bs.engine.PeerDisconnected(p)
425 426
}

427
func (bs *Bitswap) ReceiveError(err error) {
428
	log.Infof("Bitswap ReceiveError: %s", err)
429 430
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
431 432
}

433
func (bs *Bitswap) Close() error {
434
	return bs.process.Close()
435
}
436

437 438
func (bs *Bitswap) GetWantlist() []*cid.Cid {
	var out []*cid.Cid
439
	for _, e := range bs.wm.wl.Entries() {
440
		out = append(out, e.Cid)
441 442 443
	}
	return out
}
444 445 446 447

func (bs *Bitswap) IsOnline() bool {
	return true
}