bitswap.go 12.1 KB
Newer Older
1
// Package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11 12
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"

Jeromy's avatar
Jeromy committed
13
	decision "github.com/ipfs/go-bitswap/decision"
14
	bsgetter "github.com/ipfs/go-bitswap/getter"
Jeromy's avatar
Jeromy committed
15
	bsmsg "github.com/ipfs/go-bitswap/message"
16
	bsmq "github.com/ipfs/go-bitswap/messagequeue"
Jeromy's avatar
Jeromy committed
17
	bsnet "github.com/ipfs/go-bitswap/network"
18
	bspm "github.com/ipfs/go-bitswap/peermanager"
19
	bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
20
	bssession "github.com/ipfs/go-bitswap/session"
21
	bssm "github.com/ipfs/go-bitswap/sessionmanager"
22
	bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
23
	bswm "github.com/ipfs/go-bitswap/wantmanager"
Jeromy's avatar
Jeromy committed
24 25 26 27 28 29 30 31 32
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
	logging "github.com/ipfs/go-log"
	metrics "github.com/ipfs/go-metrics-interface"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
	peer "github.com/libp2p/go-libp2p-peer"
33 34
)

Jeromy's avatar
Jeromy committed
35
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36

37 38
var _ exchange.SessionExchange = (*Bitswap)(nil)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
39
const (
40
	// these requests take at _least_ two minutes at the moment.
Steven Allen's avatar
Steven Allen committed
41
	provideTimeout = time.Minute * 3
Jeromy's avatar
Jeromy committed
42
)
43

Jeromy's avatar
Jeromy committed
44
var (
45 46
	// ProvideEnabled is a variable that tells Bitswap whether or not
	// to handle providing blocks (see experimental provider system)
47 48
	ProvideEnabled = true

49 50 51 52
	// HasBlockBufferSize is the buffer size of the channel for new blocks
	// that need to be provided. They should get pulled over by the
	// provideCollector even before they are actually provided.
	// TODO: Does this need to be this large givent that?
53 54
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
Steven Allen's avatar
Steven Allen committed
55
	provideWorkerMax      = 6
56 57 58

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
)
Jeromy's avatar
Jeromy committed
60

Brian Tiger Chow's avatar
Brian Tiger Chow committed
61 62
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
63
// delegate. Runs until context is cancelled or bitswap.Close is called.
Łukasz Magiera's avatar
Łukasz Magiera committed
64 65
func New(parent context.Context, network bsnet.BitSwapNetwork,
	bstore blockstore.Blockstore) exchange.Interface {
66

67 68
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
69
	// coupled to the concerns of the ipfs daemon in this way.
70 71 72 73
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
74
	ctx, cancelFunc := context.WithCancel(parent)
75
	ctx = metrics.CtxSubScope(ctx, "bitswap")
76
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
77
		" data blocks recived").Histogram(metricsBuckets)
78
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
79
		" data blocks recived").Histogram(metricsBuckets)
80

81 82 83
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)

84 85 86 87
	px := process.WithTeardown(func() error {
		return nil
	})

88 89
	peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
		return bsmq.New(ctx, p, network)
90 91
	}

92
	wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
93 94
	pqm := bspqm.New(ctx, network)

95 96
	sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
		return bssession.New(ctx, id, wm, pm, srs)
97 98
	}
	sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
99
		return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
100
	}
101 102 103
	sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
		return bssrs.New(ctx)
	}
104

105
	bs := &Bitswap{
106
		blockstore:    bstore,
107
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
108
		network:       network,
109
		process:       px,
110 111
		newBlocks:     make(chan cid.Cid, HasBlockBufferSize),
		provideKeys:   make(chan cid.Cid, provideKeysBufferSize),
112
		wm:            wm,
113
		pqm:           pqm,
114
		sm:            bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
115
		counters:      new(counters),
116 117 118
		dupMetric:     dupHist,
		allMetric:     allHist,
		sentHistogram: sentHistogram,
119
	}
120 121

	bs.wm.Startup()
122
	bs.pqm.Startup()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
123
	network.SetDelegate(bs)
124

125
	// Start up bitswaps async worker routines
126
	bs.startWorkers(ctx, px)
127 128 129 130 131 132 133 134 135

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

136 137 138
	return bs
}

139 140
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
141
	// the wantlist tracks global wants for bitswap
142
	wm *bswm.WantManager
143

144 145 146
	// the provider query manager manages requests to find providers
	pqm *bspqm.ProviderQueryManager

147 148
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149

150 151
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
152 153 154 155 156

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

157 158 159
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
160
	newBlocks chan cid.Cid
161
	// provideKeys directly feeds provide workers
162
	provideKeys chan cid.Cid
163

164 165 166
	process process.Process

	// Counters for various statistics
167 168
	counterLk sync.Mutex
	counters  *counters
169 170

	// Metrics interface metrics
171 172 173
	dupMetric     metrics.Histogram
	allMetric     metrics.Histogram
	sentHistogram metrics.Histogram
Jeromy's avatar
Jeromy committed
174

175 176
	// the sessionmanager manages tracking sessions
	sm *bssm.SessionManager
177 178
}

179 180 181 182 183 184 185 186 187 188
type counters struct {
	blocksRecvd    uint64
	dupBlocksRecvd uint64
	dupDataRecvd   uint64
	blocksSent     uint64
	dataSent       uint64
	dataRecvd      uint64
	messagesRecvd  uint64
}

189
// GetBlock attempts to retrieve a particular block from peers within the
190
// deadline enforced by the context.
191
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
192
	return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
193 194
}

195 196
// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
197 198
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
	var out []cid.Cid
199
	for _, e := range bs.engine.WantlistForPeer(p) {
200
		out = append(out, e.Cid)
201 202 203 204
	}
	return out
}

205 206
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
207 208 209 210
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

211 212 213 214 215 216 217
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
218
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
219 220
	session := bs.sm.NewSession(ctx)
	return session.GetBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
221 222
}

Łukasz Magiera's avatar
Łukasz Magiera committed
223
// HasBlock announces the existence of a block to this bitswap service. The
224
// service will potentially notify its peers.
225
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
226 227 228 229 230 231 232 233
	return bs.receiveBlockFrom(blk, "")
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
234 235 236 237 238
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
239

240
	err := bs.blockstore.Put(blk)
241 242
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
243 244
		return err
	}
245

246 247 248 249 250
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
251

252
	bs.sm.ReceiveBlockFrom(from, blk)
253

254 255
	bs.engine.AddBlock(blk)

256 257 258 259 260 261 262
	if ProvideEnabled {
		select {
		case bs.newBlocks <- blk.Cid():
			// send block off to be reprovided
		case <-bs.process.Closing():
			return bs.process.Close()
		}
263 264
	}
	return nil
265 266
}

267 268
// ReceiveMessage is called by the network interface when a new message is
// received.
269
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Steven Allen's avatar
Steven Allen committed
270 271 272
	bs.counterLk.Lock()
	bs.counters.messagesRecvd++
	bs.counterLk.Unlock()
Jeromy's avatar
Jeromy committed
273

Jeromy's avatar
Jeromy committed
274 275
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
276
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
277 278
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
279

280 281 282
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
283 284 285
		return
	}

Jeromy's avatar
Jeromy committed
286 287
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
288

Jeromy's avatar
Jeromy committed
289
		wg.Add(1)
290
		go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine...
Jeromy's avatar
Jeromy committed
291
			defer wg.Done()
292

293
			bs.updateReceiveCounters(b)
294
			bs.sm.UpdateReceiveCounters(b)
295
			log.Debugf("got block %s from %s", b, p)
296

297
			// skip received blocks that are not in the wantlist
298
			if !bs.wm.IsWanted(b.Cid()) {
299 300 301
				return
			}

302 303
			if err := bs.receiveBlockFrom(b, p); err != nil {
				log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
Jeromy's avatar
Jeromy committed
304
			}
305
			log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
Jeromy's avatar
Jeromy committed
306
		}(block)
307
	}
Jeromy's avatar
Jeromy committed
308
	wg.Wait()
309 310
}

311
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
312
	blkLen := len(b.RawData())
313
	has, err := bs.blockstore.Has(b.Cid())
314 315
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
316
		return
317
	}
318 319 320

	bs.allMetric.Observe(float64(blkLen))
	if has {
321
		bs.dupMetric.Observe(float64(blkLen))
322 323
	}

324 325
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
326
	c := bs.counters
327

328 329
	c.blocksRecvd++
	c.dataRecvd += uint64(len(b.RawData()))
330
	if has {
331 332
		c.dupBlocksRecvd++
		c.dupDataRecvd += uint64(blkLen)
333 334 335
	}
}

336 337
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
338
func (bs *Bitswap) PeerConnected(p peer.ID) {
339
	bs.wm.Connected(p)
340
	bs.engine.PeerConnected(p)
341 342
}

343 344
// PeerDisconnected is called by the network interface when a peer
// closes a connection
345
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
346
	bs.wm.Disconnected(p)
347
	bs.engine.PeerDisconnected(p)
348 349
}

350 351
// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
352
func (bs *Bitswap) ReceiveError(err error) {
353
	log.Infof("Bitswap ReceiveError: %s", err)
354 355
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
356 357
}

358
// Close is called to shutdown Bitswap
359
func (bs *Bitswap) Close() error {
360
	return bs.process.Close()
361
}
362

363
// GetWantlist returns the current local wantlist.
364
func (bs *Bitswap) GetWantlist() []cid.Cid {
365
	entries := bs.wm.CurrentWants()
366
	out := make([]cid.Cid, 0, len(entries))
367
	for _, e := range entries {
368
		out = append(out, e.Cid)
369 370 371
	}
	return out
}
372

373
// IsOnline is needed to match go-ipfs-exchange-interface
374 375 376
func (bs *Bitswap) IsOnline() bool {
	return true
}
377

378 379 380 381 382 383
// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
384 385 386
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
	return bs.sm.NewSession(ctx)
}