bitswap.go 11.3 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7
	"math"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
12

13 14 15 16 17 18 19
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
Jeromy's avatar
Jeromy committed
20
	flags "github.com/ipfs/go-ipfs/flags"
21
	"github.com/ipfs/go-ipfs/thirdparty/delay"
22
	loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
23

24 25 26
	context "context"
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
27
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
28
	peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
41
	provideTimeout         = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
)
Jeromy's avatar
Jeromy committed
52

Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

61
var rebroadcastDelay = delay.Fixed(time.Second * 10)
62

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
67
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
68
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
69

70 71 72 73 74 75 76
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
77 78
	ctx, cancelFunc := context.WithCancel(parent)

79
	notif := notifications.New()
80 81 82 83 84
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

85
	bs := &Bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		self:          p,
87
		blockstore:    bstore,
88
		notifications: notif,
89
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
90
		network:       network,
91
		findKeys:      make(chan *blockRequest, sizeBatchRequestChan),
92
		process:       px,
93
		newBlocks:     make(chan key.Key, HasBlockBufferSize),
94
		provideKeys:   make(chan key.Key, provideKeysBufferSize),
95
		wm:            NewWantManager(ctx, network),
96
	}
97
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	network.SetDelegate(bs)
99

100 101
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
102 103 104 105 106 107 108 109 110

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

111 112 113
	return bs
}

114 115
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
116

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119
	// the ID of the peer to act on behalf of
	self peer.ID

120 121
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
122

123 124
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
125
	wm *WantManager
126

127 128 129 130 131 132
	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

Jeromy's avatar
Jeromy committed
133
	// send keys to a worker to find and connect to providers for them
134
	findKeys chan *blockRequest
Jeromy's avatar
Jeromy committed
135

136
	engine *decision.Engine
137

138
	process process.Process
139

140
	newBlocks chan key.Key
141

142
	provideKeys chan key.Key
143

Jeromy's avatar
Jeromy committed
144
	counterLk      sync.Mutex
145 146
	blocksRecvd    int
	dupBlocksRecvd int
147
	dupDataRecvd   uint64
148 149
}

150
type blockRequest struct {
151 152
	Key key.Key
	Ctx context.Context
153 154
}

155
// GetBlock attempts to retrieve a particular block from peers within the
156
// deadline enforced by the context.
157
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
158 159 160
	if k == "" {
		return nil, blockstore.ErrNotFound
	}
161

162 163 164 165
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
166 167
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
168

169
	ctx, cancelFunc := context.WithCancel(parent)
170

171
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
172 173
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
174 175 176 177

	defer func() {
		cancelFunc()
	}()
178

179
	promise, err := bs.GetBlocks(ctx, []key.Key{k})
180 181
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
182
	}
183 184

	select {
185 186 187 188 189 190 191 192 193
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
194
		return block, nil
195 196
	case <-parent.Done():
		return nil, parent.Err()
197 198 199
	}
}

200 201
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
	var out []key.Key
202 203 204 205 206 207
	for _, e := range bs.engine.WantlistForPeer(p) {
		out = append(out, e.Key)
	}
	return out
}

208 209 210 211
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

212 213 214 215 216 217 218
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
219
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
220
	if len(keys) == 0 {
221
		out := make(chan blocks.Block)
222 223 224 225
		close(out)
		return out, nil
	}

226 227 228 229 230
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
231
	promise := bs.notifications.Subscribe(ctx, keys...)
232

233 234 235 236
	for _, k := range keys {
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	}

237
	bs.wm.WantBlocks(ctx, keys)
238

239 240 241
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
242
	req := &blockRequest{
243 244
		Key: keys[0],
		Ctx: ctx,
245
	}
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282

	remaining := make(map[key.Key]struct{})
	for _, k := range keys {
		remaining[k] = struct{}{}
	}

	out := make(chan blocks.Block)
	go func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		defer close(out)
		defer func() {
			var toCancel []key.Key
			for k, _ := range remaining {
				toCancel = append(toCancel, k)
			}
			bs.CancelWants(toCancel)
		}()
		for {
			select {
			case blk, ok := <-promise:
				if !ok {
					return
				}

				delete(remaining, blk.Key())
				select {
				case out <- blk:
				case <-ctx.Done():
					return
				}
			case <-ctx.Done():
				return
			}
		}
	}()

283
	select {
Jeromy's avatar
Jeromy committed
284
	case bs.findKeys <- req:
285
		return out, nil
286 287 288
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
289 290
}

291
// CancelWant removes a given key from the wantlist
Jeromy's avatar
Jeromy committed
292 293
func (bs *Bitswap) CancelWants(keys []key.Key) {
	bs.wm.CancelWants(keys)
294 295
}

296 297
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
298
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
299 300 301 302 303
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
304

305
	err := bs.blockstore.Put(blk)
306 307
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
308 309
		return err
	}
310

311 312 313 314 315
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
316 317
	bs.notifications.Publish(blk)

318 319
	bs.engine.AddBlock(blk)

320
	select {
321
	case bs.newBlocks <- blk.Key():
322
		// send block off to be reprovided
323 324
	case <-bs.process.Closing():
		return bs.process.Close()
325 326
	}
	return nil
327 328
}

329
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
330 331
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
332
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
333 334
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
335

336 337 338
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
339 340 341 342
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
343
	var keys []key.Key
344 345
	for _, block := range iblocks {
		if _, found := bs.wm.wl.Contains(block.Key()); !found {
346
			log.Infof("received un-asked-for %s from %s", block, p)
347 348
			continue
		}
Jeromy's avatar
Jeromy committed
349 350 351
		keys = append(keys, block.Key())
	}
	bs.wm.CancelWants(keys)
352

Jeromy's avatar
Jeromy committed
353 354 355
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
356
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
357
			defer wg.Done()
358

359
			if err := bs.updateReceiveCounters(b); err != nil {
360
				return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
Jeromy's avatar
Jeromy committed
361
			}
362

363 364 365
			k := b.Key()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)

366
			log.Debugf("got block %s from %s", b, p)
367
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
368 369 370
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
371
	}
Jeromy's avatar
Jeromy committed
372
	wg.Wait()
373 374
}

375 376
var ErrAlreadyHaveBlock = errors.New("already have block")

377
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
378 379 380
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
	bs.blocksRecvd++
381
	has, err := bs.blockstore.Has(b.Key())
382 383 384 385 386 387
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
		return err
	}
	if err == nil && has {
		bs.dupBlocksRecvd++
Jeromy's avatar
Jeromy committed
388
		bs.dupDataRecvd += uint64(len(b.RawData()))
389 390 391 392 393 394 395 396
	}

	if has {
		return ErrAlreadyHaveBlock
	}
	return nil
}

397
// Connected/Disconnected warns bitswap about peer connections
398
func (bs *Bitswap) PeerConnected(p peer.ID) {
399
	bs.wm.Connected(p)
400 401 402
}

// Connected/Disconnected warns bitswap about peer connections
403
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
404
	bs.wm.Disconnected(p)
405
	bs.engine.PeerDisconnected(p)
406 407
}

408
func (bs *Bitswap) ReceiveError(err error) {
409
	log.Infof("Bitswap ReceiveError: %s", err)
410 411
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
412 413
}

414
func (bs *Bitswap) Close() error {
415
	return bs.process.Close()
416
}
417

418 419
func (bs *Bitswap) GetWantlist() []key.Key {
	var out []key.Key
420
	for _, e := range bs.wm.wl.Entries() {
421 422 423 424
		out = append(out, e.Key)
	}
	return out
}
425 426 427 428

func (bs *Bitswap) IsOnline() bool {
	return true
}