bitswap.go 10.5 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7
	"math"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11 12 13 14
	process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	procctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
	logging "gx/ipfs/QmaDNZ4QMdBdku1YZWBysufYyoQt1negQGNav6PLYarbY8/go-log"
15
	peer "gx/ipfs/QmbyvM8zRFDkbFdYyt1MnevUMJ62SiSGbfDFZ3Z8nkrzr4/go-libp2p-peer"
16

17 18
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
19
	key "github.com/ipfs/go-ipfs/blocks/key"
20 21 22 23 24 25
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
26
	flags "github.com/ipfs/go-ipfs/flags"
27
	"github.com/ipfs/go-ipfs/thirdparty/delay"
28
	loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
41
	provideTimeout         = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
)
Jeromy's avatar
Jeromy committed
52

Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

61
var rebroadcastDelay = delay.Fixed(time.Second * 10)
62

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
67
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
68
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
69

70 71 72 73 74 75 76
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
77 78
	ctx, cancelFunc := context.WithCancel(parent)

79
	notif := notifications.New()
80 81 82 83 84
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

85
	bs := &Bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		self:          p,
87
		blockstore:    bstore,
88
		notifications: notif,
89
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
90
		network:       network,
91
		findKeys:      make(chan *wantlist.Entry, sizeBatchRequestChan),
92
		process:       px,
93
		newBlocks:     make(chan blocks.Block, HasBlockBufferSize),
94
		provideKeys:   make(chan key.Key, provideKeysBufferSize),
95
		wm:            NewWantManager(ctx, network),
96
	}
97
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	network.SetDelegate(bs)
99

100 101
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
102 103 104 105 106 107 108 109 110

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

111 112 113
	return bs
}

114 115
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
116

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119
	// the ID of the peer to act on behalf of
	self peer.ID

120 121
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
122

123 124
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
125
	wm *WantManager
126

127 128 129 130 131 132
	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

Jeromy's avatar
Jeromy committed
133
	// send keys to a worker to find and connect to providers for them
134
	findKeys chan *wantlist.Entry
Jeromy's avatar
Jeromy committed
135

136
	engine *decision.Engine
137

138
	process process.Process
139

140
	newBlocks chan blocks.Block
141

142
	provideKeys chan key.Key
143

Jeromy's avatar
Jeromy committed
144
	counterLk      sync.Mutex
145 146
	blocksRecvd    int
	dupBlocksRecvd int
147
	dupDataRecvd   uint64
148 149
}

150
type blockRequest struct {
151 152
	key key.Key
	ctx context.Context
153 154
}

155
// GetBlock attempts to retrieve a particular block from peers within the
156
// deadline enforced by the context.
157
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
158 159 160
	if k == "" {
		return nil, blockstore.ErrNotFound
	}
161

162 163 164 165
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
166 167
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
168

169
	ctx, cancelFunc := context.WithCancel(parent)
170

171
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
172 173
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
174 175 176 177

	defer func() {
		cancelFunc()
	}()
178

179
	promise, err := bs.GetBlocks(ctx, []key.Key{k})
180 181
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
182
	}
183 184

	select {
185 186 187 188 189 190 191 192 193
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
194
		return block, nil
195 196
	case <-parent.Done():
		return nil, parent.Err()
197 198 199
	}
}

200 201
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
	var out []key.Key
202 203 204 205 206 207
	for _, e := range bs.engine.WantlistForPeer(p) {
		out = append(out, e.Key)
	}
	return out
}

208 209 210 211 212 213 214
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
215
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
216
	if len(keys) == 0 {
217
		out := make(chan blocks.Block)
218 219 220 221
		close(out)
		return out, nil
	}

222 223 224 225 226
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
227
	promise := bs.notifications.Subscribe(ctx, keys...)
228

229 230 231 232
	for _, k := range keys {
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	}

233
	bs.wm.WantBlocks(ctx, keys)
234

235 236 237 238 239 240
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
	req := &wantlist.Entry{
		Key: keys[0],
		Ctx: ctx,
241
	}
242
	select {
Jeromy's avatar
Jeromy committed
243
	case bs.findKeys <- req:
244
		return promise, nil
245 246 247
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
248 249
}

250 251 252 253 254
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(ks []key.Key) {
	bs.wm.CancelWants(ks)
}

255 256
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
257
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
258 259 260 261 262
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
263

264 265 266
	err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
267 268
		return err
	}
269

Jeromy's avatar
Jeromy committed
270 271
	bs.notifications.Publish(blk)

272 273
	bs.engine.AddBlock(blk)

274 275
	select {
	case bs.newBlocks <- blk:
276
		// send block off to be reprovided
277 278
	case <-bs.process.Closing():
		return bs.process.Close()
279 280
	}
	return nil
281 282
}

283
func (bs *Bitswap) tryPutBlock(blk blocks.Block, attempts int) error {
284 285 286 287 288 289 290 291 292 293 294
	var err error
	for i := 0; i < attempts; i++ {
		if err = bs.blockstore.Put(blk); err == nil {
			break
		}

		time.Sleep(time.Millisecond * time.Duration(400*(i+1)))
	}
	return err
}

295
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
296 297
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
298
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
299 300
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
301

302 303 304
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
305 306 307 308
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
309
	var keys []key.Key
310 311
	for _, block := range iblocks {
		if _, found := bs.wm.wl.Contains(block.Key()); !found {
312
			log.Infof("received un-asked-for %s from %s", block, p)
313 314
			continue
		}
Jeromy's avatar
Jeromy committed
315 316 317
		keys = append(keys, block.Key())
	}
	bs.wm.CancelWants(keys)
318

Jeromy's avatar
Jeromy committed
319 320 321
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
322
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
323
			defer wg.Done()
324

325
			if err := bs.updateReceiveCounters(b); err != nil {
326
				return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
Jeromy's avatar
Jeromy committed
327
			}
328

329 330 331
			k := b.Key()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)

332
			log.Debugf("got block %s from %s", b, p)
333
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
334 335 336
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
337
	}
Jeromy's avatar
Jeromy committed
338
	wg.Wait()
339 340
}

341 342
var ErrAlreadyHaveBlock = errors.New("already have block")

343
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
344 345 346
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
	bs.blocksRecvd++
347
	has, err := bs.blockstore.Has(b.Key())
348 349 350 351 352 353
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
		return err
	}
	if err == nil && has {
		bs.dupBlocksRecvd++
354
		bs.dupDataRecvd += uint64(len(b.Data()))
355 356 357 358 359 360 361 362
	}

	if has {
		return ErrAlreadyHaveBlock
	}
	return nil
}

363
// Connected/Disconnected warns bitswap about peer connections
364
func (bs *Bitswap) PeerConnected(p peer.ID) {
365
	bs.wm.Connected(p)
366 367 368
}

// Connected/Disconnected warns bitswap about peer connections
369
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
370
	bs.wm.Disconnected(p)
371
	bs.engine.PeerDisconnected(p)
372 373
}

374
func (bs *Bitswap) ReceiveError(err error) {
375
	log.Infof("Bitswap ReceiveError: %s", err)
376 377
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
378 379
}

380
func (bs *Bitswap) Close() error {
381
	return bs.process.Close()
382
}
383

384 385
func (bs *Bitswap) GetWantlist() []key.Key {
	var out []key.Key
386
	for _, e := range bs.wm.wl.Entries() {
387 388 389 390
		out = append(out, e.Key)
	}
	return out
}