bitswap.go 10.5 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7
	"math"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11 12 13 14
	process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	procctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
	logging "gx/ipfs/QmaDNZ4QMdBdku1YZWBysufYyoQt1negQGNav6PLYarbY8/go-log"
15
	peer "gx/ipfs/QmbyvM8zRFDkbFdYyt1MnevUMJ62SiSGbfDFZ3Z8nkrzr4/go-libp2p-peer"
16

17 18
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
19
	key "github.com/ipfs/go-ipfs/blocks/key"
20 21 22 23 24 25
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
26
	flags "github.com/ipfs/go-ipfs/flags"
27
	"github.com/ipfs/go-ipfs/thirdparty/delay"
28
	loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
41
	provideTimeout         = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
)
Jeromy's avatar
Jeromy committed
52

Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

61
var rebroadcastDelay = delay.Fixed(time.Second * 10)
62

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
67
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
68
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
69

70 71 72 73 74 75 76
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
77 78
	ctx, cancelFunc := context.WithCancel(parent)

79
	notif := notifications.New()
80 81 82 83 84
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

85
	bs := &Bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		self:          p,
87
		blockstore:    bstore,
88
		notifications: notif,
89
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
90
		network:       network,
91
		findKeys:      make(chan *wantlist.Entry, sizeBatchRequestChan),
92
		process:       px,
93
		newBlocks:     make(chan blocks.Block, HasBlockBufferSize),
94
		provideKeys:   make(chan key.Key, provideKeysBufferSize),
95
		wm:            NewWantManager(ctx, network),
96
	}
97
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	network.SetDelegate(bs)
99

100 101
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
102 103 104 105 106 107 108 109 110

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

111 112 113
	return bs
}

114 115
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
116

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119
	// the ID of the peer to act on behalf of
	self peer.ID

120 121
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
122

123 124
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
125
	wm *WantManager
126

127 128 129 130 131 132
	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

Jeromy's avatar
Jeromy committed
133
	// send keys to a worker to find and connect to providers for them
134
	findKeys chan *wantlist.Entry
Jeromy's avatar
Jeromy committed
135

136
	engine *decision.Engine
137

138
	process process.Process
139

140
	newBlocks chan blocks.Block
141

142
	provideKeys chan key.Key
143

Jeromy's avatar
Jeromy committed
144
	counterLk      sync.Mutex
145 146
	blocksRecvd    int
	dupBlocksRecvd int
147
	dupDataRecvd   uint64
148 149
}

150
type blockRequest struct {
151 152
	key key.Key
	ctx context.Context
153 154
}

155
// GetBlock attempts to retrieve a particular block from peers within the
156
// deadline enforced by the context.
157
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
158

159 160 161 162
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
163 164
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
165

166
	ctx, cancelFunc := context.WithCancel(parent)
167

168
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
169 170
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
171 172 173 174

	defer func() {
		cancelFunc()
	}()
175

176
	promise, err := bs.GetBlocks(ctx, []key.Key{k})
177 178
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
179
	}
180 181

	select {
182 183 184 185 186 187 188 189 190
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
191
		return block, nil
192 193
	case <-parent.Done():
		return nil, parent.Err()
194 195 196
	}
}

197 198
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
	var out []key.Key
199 200 201 202 203 204
	for _, e := range bs.engine.WantlistForPeer(p) {
		out = append(out, e.Key)
	}
	return out
}

205 206 207 208 209 210 211
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
212
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
213
	if len(keys) == 0 {
214
		out := make(chan blocks.Block)
215 216 217 218
		close(out)
		return out, nil
	}

219 220 221 222 223
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
224
	promise := bs.notifications.Subscribe(ctx, keys...)
225

226 227 228 229
	for _, k := range keys {
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	}

230
	bs.wm.WantBlocks(ctx, keys)
231

232 233 234 235 236 237
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
	req := &wantlist.Entry{
		Key: keys[0],
		Ctx: ctx,
238
	}
239
	select {
Jeromy's avatar
Jeromy committed
240
	case bs.findKeys <- req:
241
		return promise, nil
242 243 244
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
245 246
}

247 248 249 250 251
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(ks []key.Key) {
	bs.wm.CancelWants(ks)
}

252 253
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
254
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
255 256 257 258 259
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
260

261 262 263
	err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
264 265
		return err
	}
266

Jeromy's avatar
Jeromy committed
267 268
	bs.notifications.Publish(blk)

269 270
	bs.engine.AddBlock(blk)

271 272
	select {
	case bs.newBlocks <- blk:
273
		// send block off to be reprovided
274 275
	case <-bs.process.Closing():
		return bs.process.Close()
276 277
	}
	return nil
278 279
}

280
func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
281 282 283 284 285 286 287 288 289 290 291
	var err error
	for i := 0; i < attempts; i++ {
		if err = bs.blockstore.Put(blk); err == nil {
			break
		}

		time.Sleep(time.Millisecond * time.Duration(400*(i+1)))
	}
	return err
}

292
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
293 294
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
295
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
296 297
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
298

299 300 301
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
302 303 304 305
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
306
	var keys []key.Key
307 308
	for _, block := range iblocks {
		if _, found := bs.wm.wl.Contains(block.Key()); !found {
309
			log.Infof("received un-asked-for %s from %s", block, p)
310 311
			continue
		}
Jeromy's avatar
Jeromy committed
312 313 314
		keys = append(keys, block.Key())
	}
	bs.wm.CancelWants(keys)
315

Jeromy's avatar
Jeromy committed
316 317 318
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
319
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
320
			defer wg.Done()
321

322
			if err := bs.updateReceiveCounters(b); err != nil {
323
				return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
Jeromy's avatar
Jeromy committed
324
			}
325

326 327 328
			k := b.Key()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)

329
			log.Debugf("got block %s from %s", b, p)
330
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
331 332 333
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
334
	}
Jeromy's avatar
Jeromy committed
335
	wg.Wait()
336 337
}

338 339
var ErrAlreadyHaveBlock = errors.New("already have block")

340
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
341 342 343
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
	bs.blocksRecvd++
344
	has, err := bs.blockstore.Has(b.Key())
345 346 347 348 349 350
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
		return err
	}
	if err == nil && has {
		bs.dupBlocksRecvd++
351
		bs.dupDataRecvd += uint64(len(b.Data()))
352 353 354 355 356 357 358 359
	}

	if has {
		return ErrAlreadyHaveBlock
	}
	return nil
}

360
// Connected/Disconnected warns bitswap about peer connections
361
func (bs *Bitswap) PeerConnected(p peer.ID) {
362
	bs.wm.Connected(p)
363 364 365
}

// Connected/Disconnected warns bitswap about peer connections
366
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
367
	bs.wm.Disconnected(p)
368
	bs.engine.PeerDisconnected(p)
369 370
}

371
func (bs *Bitswap) ReceiveError(err error) {
372
	log.Infof("Bitswap ReceiveError: %s", err)
373 374
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
375 376
}

377
func (bs *Bitswap) Close() error {
378
	return bs.process.Close()
379
}
380

381 382
func (bs *Bitswap) GetWantlist() []key.Key {
	var out []key.Key
383
	for _, e := range bs.wm.wl.Entries() {
384 385 386 387
		out = append(out, e.Key)
	}
	return out
}