bitswap.go 10.9 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7
	"math"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11 12
	key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"

13 14 15 16 17 18 19
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
Jeromy's avatar
Jeromy committed
20
	flags "github.com/ipfs/go-ipfs/flags"
21
	"github.com/ipfs/go-ipfs/thirdparty/delay"
22
	loggables "gx/ipfs/QmYrv4LgCC8FhG2Ab4bwuq5DqBdwMtx3hMb3KKJDZcr2d7/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
23

Jeromy's avatar
Jeromy committed
24 25
	process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	procctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
Jeromy's avatar
Jeromy committed
26
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
27
	peer "gx/ipfs/QmWXjJo15p4pzT7cayEwZi2sWgJqLnGDof6ZGMh9xBgU1p/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
28
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
41
	provideTimeout         = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
)
Jeromy's avatar
Jeromy committed
52

Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

61
var rebroadcastDelay = delay.Fixed(time.Second * 10)
62

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
67
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
68
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
69

70 71 72 73 74 75 76
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
77 78
	ctx, cancelFunc := context.WithCancel(parent)

79
	notif := notifications.New()
80 81 82 83 84
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

85
	bs := &Bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		self:          p,
87
		blockstore:    bstore,
88
		notifications: notif,
89
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
90
		network:       network,
91
		findKeys:      make(chan *blockRequest, sizeBatchRequestChan),
92
		process:       px,
93
		newBlocks:     make(chan blocks.Block, HasBlockBufferSize),
94
		provideKeys:   make(chan key.Key, provideKeysBufferSize),
95
		wm:            NewWantManager(ctx, network),
96
	}
97
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	network.SetDelegate(bs)
99

100 101
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
102 103 104 105 106 107 108 109 110

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

111 112 113
	return bs
}

114 115
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
116

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119
	// the ID of the peer to act on behalf of
	self peer.ID

120 121
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
122

123 124
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
125
	wm *WantManager
126

127 128 129 130 131 132
	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

Jeromy's avatar
Jeromy committed
133
	// send keys to a worker to find and connect to providers for them
134
	findKeys chan *blockRequest
Jeromy's avatar
Jeromy committed
135

136
	engine *decision.Engine
137

138
	process process.Process
139

140
	newBlocks chan blocks.Block
141

142
	provideKeys chan key.Key
143

Jeromy's avatar
Jeromy committed
144
	counterLk      sync.Mutex
145 146
	blocksRecvd    int
	dupBlocksRecvd int
147
	dupDataRecvd   uint64
148 149
}

150
type blockRequest struct {
151 152
	Key key.Key
	Ctx context.Context
153 154
}

155
// GetBlock attempts to retrieve a particular block from peers within the
156
// deadline enforced by the context.
157
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
158 159 160
	if k == "" {
		return nil, blockstore.ErrNotFound
	}
161

162 163 164 165
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
166 167
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
168

169
	ctx, cancelFunc := context.WithCancel(parent)
170

171
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
172 173
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
174 175 176 177

	defer func() {
		cancelFunc()
	}()
178

179
	promise, err := bs.GetBlocks(ctx, []key.Key{k})
180 181
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
182
	}
183 184

	select {
185 186 187 188 189 190 191 192 193
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
194
		return block, nil
195 196
	case <-parent.Done():
		return nil, parent.Err()
197 198 199
	}
}

200 201
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
	var out []key.Key
202 203 204 205 206 207
	for _, e := range bs.engine.WantlistForPeer(p) {
		out = append(out, e.Key)
	}
	return out
}

208 209 210 211
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

212 213 214 215 216 217 218
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
219
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
220
	if len(keys) == 0 {
221
		out := make(chan blocks.Block)
222 223 224 225
		close(out)
		return out, nil
	}

226 227 228 229 230
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
231
	promise := bs.notifications.Subscribe(ctx, keys...)
232

233 234 235 236
	for _, k := range keys {
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	}

237
	bs.wm.WantBlocks(ctx, keys)
238

239 240 241
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
242
	req := &blockRequest{
243 244
		Key: keys[0],
		Ctx: ctx,
245
	}
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282

	remaining := make(map[key.Key]struct{})
	for _, k := range keys {
		remaining[k] = struct{}{}
	}

	out := make(chan blocks.Block)
	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(out)
		defer func() {
			var toCancel []key.Key
			for k, _ := range remaining {
				toCancel = append(toCancel, k)
			}
			bs.CancelWants(toCancel)
		}()
		for {
			select {
			case blk, ok := <-promise:
				if !ok {
					return
				}

				delete(remaining, blk.Key())
				select {
				case out <- blk:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

283
	select {
Jeromy's avatar
Jeromy committed
284
	case bs.findKeys <- req:
285
		return out, nil
286 287 288
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
289 290
}

291
// CancelWant removes a given key from the wantlist
Jeromy's avatar
Jeromy committed
292 293
func (bs *Bitswap) CancelWants(keys []key.Key) {
	bs.wm.CancelWants(keys)
294 295
}

296 297
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
298
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
299 300 301 302 303
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
304

305
	err := bs.blockstore.Put(blk)
306 307
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
308 309
		return err
	}
310

Jeromy's avatar
Jeromy committed
311 312
	bs.notifications.Publish(blk)

313 314
	bs.engine.AddBlock(blk)

315 316
	select {
	case bs.newBlocks <- blk:
317
		// send block off to be reprovided
318 319
	case <-bs.process.Closing():
		return bs.process.Close()
320 321
	}
	return nil
322 323
}

324
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
325 326
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
327
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
328 329
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
330

331 332 333
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
334 335 336 337
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
338
	var keys []key.Key
339 340
	for _, block := range iblocks {
		if _, found := bs.wm.wl.Contains(block.Key()); !found {
341
			log.Infof("received un-asked-for %s from %s", block, p)
342 343
			continue
		}
Jeromy's avatar
Jeromy committed
344 345 346
		keys = append(keys, block.Key())
	}
	bs.wm.CancelWants(keys)
347

Jeromy's avatar
Jeromy committed
348 349 350
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
351
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
352
			defer wg.Done()
353

354
			if err := bs.updateReceiveCounters(b); err != nil {
355
				return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
Jeromy's avatar
Jeromy committed
356
			}
357

358 359 360
			k := b.Key()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)

361
			log.Debugf("got block %s from %s", b, p)
362
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
363 364 365
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
366
	}
Jeromy's avatar
Jeromy committed
367
	wg.Wait()
368 369
}

370 371
var ErrAlreadyHaveBlock = errors.New("already have block")

372
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
373 374 375
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
	bs.blocksRecvd++
376
	has, err := bs.blockstore.Has(b.Key())
377 378 379 380 381 382
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
		return err
	}
	if err == nil && has {
		bs.dupBlocksRecvd++
Jeromy's avatar
Jeromy committed
383
		bs.dupDataRecvd += uint64(len(b.RawData()))
384 385 386 387 388 389 390 391
	}

	if has {
		return ErrAlreadyHaveBlock
	}
	return nil
}

392
// Connected/Disconnected warns bitswap about peer connections
393
func (bs *Bitswap) PeerConnected(p peer.ID) {
394
	bs.wm.Connected(p)
395 396 397
}

// Connected/Disconnected warns bitswap about peer connections
398
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
399
	bs.wm.Disconnected(p)
400
	bs.engine.PeerDisconnected(p)
401 402
}

403
func (bs *Bitswap) ReceiveError(err error) {
404
	log.Infof("Bitswap ReceiveError: %s", err)
405 406
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
407 408
}

409
func (bs *Bitswap) Close() error {
410
	return bs.process.Close()
411
}
412

413 414
func (bs *Bitswap) GetWantlist() []key.Key {
	var out []key.Key
415
	for _, e := range bs.wm.wl.Entries() {
416 417 418 419
		out = append(out, e.Key)
	}
	return out
}