bitswap.go 10.4 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
3 4 5
package bitswap

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"errors"
7
	"math"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11 12 13 14 15 16 17 18
	blocks "github.com/ipfs/go-ipfs/blocks"
	blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	exchange "github.com/ipfs/go-ipfs/exchange"
	decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
19
	flags "github.com/ipfs/go-ipfs/flags"
20
	"github.com/ipfs/go-ipfs/thirdparty/delay"
21
	loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
George Antoniadis's avatar
George Antoniadis committed
22
	key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
Jeromy's avatar
Jeromy committed
23

George Antoniadis's avatar
George Antoniadis committed
24 25
	process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
	procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
Jeromy's avatar
Jeromy committed
26 27 28
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
	peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer"
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37 38 39 40
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
41
	provideTimeout         = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
	sizeBatchRequestChan   = 32
43 44
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
	provideWorkerMax      = 512
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
)
Jeromy's avatar
Jeromy committed
52

Jeromy's avatar
Jeromy committed
53 54 55 56 57 58 59 60
func init() {
	if flags.LowMemMode {
		HasBlockBufferSize = 64
		provideKeysBufferSize = 512
		provideWorkerMax = 16
	}
}

61
var rebroadcastDelay = delay.Fixed(time.Second * 10)
62

Brian Tiger Chow's avatar
Brian Tiger Chow committed
63 64 65 66
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
67
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
68
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
69

70 71 72 73 74 75 76
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
77 78
	ctx, cancelFunc := context.WithCancel(parent)

79
	notif := notifications.New()
80 81 82 83 84
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

85
	bs := &Bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
		self:          p,
87
		blockstore:    bstore,
88
		notifications: notif,
89
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
90
		network:       network,
91
		findKeys:      make(chan *wantlist.Entry, sizeBatchRequestChan),
92
		process:       px,
93
		newBlocks:     make(chan blocks.Block, HasBlockBufferSize),
94
		provideKeys:   make(chan key.Key, provideKeysBufferSize),
95
		wm:            NewWantManager(ctx, network),
96
	}
97
	go bs.wm.Run()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	network.SetDelegate(bs)
99

100 101
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
102 103 104 105 106 107 108 109 110

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

111 112 113
	return bs
}

114 115
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
116

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117 118 119
	// the ID of the peer to act on behalf of
	self peer.ID

120 121
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
122

123 124
	// the peermanager manages sending messages to peers in a way that
	// wont block bitswap operation
125
	wm *WantManager
126

127
	// blockstore is the local database
128
	// NB: ensure threadsafety
129
	blockstore blockstore.Blockstore
130

131
	notifications notifications.PubSub
132

Jeromy's avatar
Jeromy committed
133
	// send keys to a worker to find and connect to providers for them
134
	findKeys chan *wantlist.Entry
Jeromy's avatar
Jeromy committed
135

136
	engine *decision.Engine
137

138
	process process.Process
139

140
	newBlocks chan blocks.Block
141

142
	provideKeys chan key.Key
143

Jeromy's avatar
Jeromy committed
144
	counterLk      sync.Mutex
145 146
	blocksRecvd    int
	dupBlocksRecvd int
147
	dupDataRecvd   uint64
148 149
}

150
type blockRequest struct {
151 152
	key key.Key
	ctx context.Context
153 154
}

155
// GetBlock attempts to retrieve a particular block from peers within the
156
// deadline enforced by the context.
157
func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
158 159 160
	if k == "" {
		return nil, blockstore.ErrNotFound
	}
161

162 163 164 165
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
166 167
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
168

169
	ctx, cancelFunc := context.WithCancel(parent)
170

171
	ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
172 173
	log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
174 175 176 177

	defer func() {
		cancelFunc()
	}()
178

179
	promise, err := bs.GetBlocks(ctx, []key.Key{k})
180 181
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
182
	}
183 184

	select {
185 186 187 188 189 190 191 192 193
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
194
		return block, nil
195 196
	case <-parent.Done():
		return nil, parent.Err()
197 198 199
	}
}

200 201
func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
	var out []key.Key
202 203 204 205 206 207
	for _, e := range bs.engine.WantlistForPeer(p) {
		out = append(out, e.Key)
	}
	return out
}

208 209 210 211
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

212 213 214 215 216 217 218
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
219
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) {
220
	if len(keys) == 0 {
221
		out := make(chan blocks.Block)
222 223 224 225
		close(out)
		return out, nil
	}

226 227 228 229 230
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
231
	promise := bs.notifications.Subscribe(ctx, keys...)
232

233 234 235 236
	for _, k := range keys {
		log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
	}

237
	bs.wm.WantBlocks(ctx, keys)
238

239 240 241 242 243 244
	// NB: Optimization. Assumes that providers of key[0] are likely to
	// be able to provide for all keys. This currently holds true in most
	// every situation. Later, this assumption may not hold as true.
	req := &wantlist.Entry{
		Key: keys[0],
		Ctx: ctx,
245
	}
246
	select {
Jeromy's avatar
Jeromy committed
247
	case bs.findKeys <- req:
248
		return promise, nil
249 250 251
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
252 253
}

254
// CancelWant removes a given key from the wantlist
Jeromy's avatar
Jeromy committed
255 256
func (bs *Bitswap) CancelWants(keys []key.Key) {
	bs.wm.CancelWants(keys)
257 258
}

259 260
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
261
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
262 263 264 265 266
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
267

268
	err := bs.blockstore.Put(blk)
269 270
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
271 272
		return err
	}
273

Jeromy's avatar
Jeromy committed
274 275
	bs.notifications.Publish(blk)

276 277
	bs.engine.AddBlock(blk)

278 279
	select {
	case bs.newBlocks <- blk:
280
		// send block off to be reprovided
281 282
	case <-bs.process.Closing():
		return bs.process.Close()
283 284
	}
	return nil
285 286
}

287
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
288 289
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
290
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
291 292
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
293

294 295 296
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
297 298 299 300
		return
	}

	// quickly send out cancels, reduces chances of duplicate block receives
301
	var keys []key.Key
302 303
	for _, block := range iblocks {
		if _, found := bs.wm.wl.Contains(block.Key()); !found {
304
			log.Infof("received un-asked-for %s from %s", block, p)
305 306
			continue
		}
Jeromy's avatar
Jeromy committed
307 308 309
		keys = append(keys, block.Key())
	}
	bs.wm.CancelWants(keys)
310

Jeromy's avatar
Jeromy committed
311 312 313
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
		wg.Add(1)
314
		go func(b blocks.Block) {
Jeromy's avatar
Jeromy committed
315
			defer wg.Done()
316

317
			if err := bs.updateReceiveCounters(b); err != nil {
318
				return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
Jeromy's avatar
Jeromy committed
319
			}
320

321 322 323
			k := b.Key()
			log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)

324
			log.Debugf("got block %s from %s", b, p)
325
			if err := bs.HasBlock(b); err != nil {
Jeromy's avatar
Jeromy committed
326 327 328
				log.Warningf("ReceiveMessage HasBlock error: %s", err)
			}
		}(block)
329
	}
Jeromy's avatar
Jeromy committed
330
	wg.Wait()
331 332
}

333 334
var ErrAlreadyHaveBlock = errors.New("already have block")

335
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error {
336 337 338
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
	bs.blocksRecvd++
339
	has, err := bs.blockstore.Has(b.Key())
340 341 342 343 344 345
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
		return err
	}
	if err == nil && has {
		bs.dupBlocksRecvd++
Jeromy's avatar
Jeromy committed
346
		bs.dupDataRecvd += uint64(len(b.RawData()))
347 348 349 350 351 352 353 354
	}

	if has {
		return ErrAlreadyHaveBlock
	}
	return nil
}

355
// Connected/Disconnected warns bitswap about peer connections
356
func (bs *Bitswap) PeerConnected(p peer.ID) {
357
	bs.wm.Connected(p)
358 359 360
}

// Connected/Disconnected warns bitswap about peer connections
361
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
362
	bs.wm.Disconnected(p)
363
	bs.engine.PeerDisconnected(p)
364 365
}

366
func (bs *Bitswap) ReceiveError(err error) {
367
	log.Infof("Bitswap ReceiveError: %s", err)
368 369
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
370
}
371

372
func (bs *Bitswap) Close() error {
373
	return bs.process.Close()
374
}
375

376 377
func (bs *Bitswap) GetWantlist() []key.Key {
	var out []key.Key
378
	for _, e := range bs.wm.wl.Entries() {
379 380 381 382
		out = append(out, e.Key)
	}
	return out
}