bitswap.go 10.8 KB
Newer Older
1
// package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11 12
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"

Jeromy's avatar
Jeromy committed
13
	decision "github.com/ipfs/go-bitswap/decision"
14
	bsgetter "github.com/ipfs/go-bitswap/getter"
Jeromy's avatar
Jeromy committed
15
	bsmsg "github.com/ipfs/go-bitswap/message"
16
	bsmq "github.com/ipfs/go-bitswap/messagequeue"
Jeromy's avatar
Jeromy committed
17
	bsnet "github.com/ipfs/go-bitswap/network"
18
	bspm "github.com/ipfs/go-bitswap/peermanager"
19
	bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
20
	bssession "github.com/ipfs/go-bitswap/session"
21
	bssm "github.com/ipfs/go-bitswap/sessionmanager"
22
	bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
23
	bswm "github.com/ipfs/go-bitswap/wantmanager"
Jeromy's avatar
Jeromy committed
24 25 26 27 28 29 30 31 32 33
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	delay "github.com/ipfs/go-ipfs-delay"
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
	logging "github.com/ipfs/go-log"
	metrics "github.com/ipfs/go-metrics-interface"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
	peer "github.com/libp2p/go-libp2p-peer"
34 35
)

Jeromy's avatar
Jeromy committed
36
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37

38 39
var _ exchange.SessionExchange = (*Bitswap)(nil)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
const (
41
	// these requests take at _least_ two minutes at the moment.
Steven Allen's avatar
Steven Allen committed
42
	provideTimeout = time.Minute * 3
Jeromy's avatar
Jeromy committed
43
)
44

Jeromy's avatar
Jeromy committed
45
var (
46 47
	ProvideEnabled = true

48 49
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
Steven Allen's avatar
Steven Allen committed
50
	provideWorkerMax      = 6
51 52 53

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
54
)
Jeromy's avatar
Jeromy committed
55

56
var rebroadcastDelay = delay.Fixed(time.Minute)
57

Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59 60 61
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
Łukasz Magiera's avatar
Łukasz Magiera committed
62 63
func New(parent context.Context, network bsnet.BitSwapNetwork,
	bstore blockstore.Blockstore) exchange.Interface {
64

65 66
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
67
	// coupled to the concerns of the ipfs daemon in this way.
68 69 70 71
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
72
	ctx, cancelFunc := context.WithCancel(parent)
73
	ctx = metrics.CtxSubScope(ctx, "bitswap")
74
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
75
		" data blocks recived").Histogram(metricsBuckets)
76
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
77
		" data blocks recived").Histogram(metricsBuckets)
78

79 80 81
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)

82 83 84 85
	px := process.WithTeardown(func() error {
		return nil
	})

86 87
	peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
		return bsmq.New(ctx, p, network)
88 89
	}

90
	wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
91 92
	pqm := bspqm.New(ctx, network)

93 94
	sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
		return bssession.New(ctx, id, wm, pm, srs)
95 96
	}
	sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
97
		return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
98
	}
99 100 101
	sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
		return bssrs.New(ctx)
	}
102

103
	bs := &Bitswap{
104
		blockstore:    bstore,
105
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
106
		network:       network,
107
		process:       px,
108 109
		newBlocks:     make(chan cid.Cid, HasBlockBufferSize),
		provideKeys:   make(chan cid.Cid, provideKeysBufferSize),
110
		wm:            wm,
111
		pqm:           pqm,
112
		sm:            bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
113
		counters:      new(counters),
114 115 116
		dupMetric:     dupHist,
		allMetric:     allHist,
		sentHistogram: sentHistogram,
117
	}
118 119

	bs.wm.Startup()
120
	bs.pqm.Startup()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
121
	network.SetDelegate(bs)
122

123 124
	// Start up bitswaps async worker routines
	bs.startWorkers(px, ctx)
125 126 127 128 129 130 131 132 133

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

134 135 136
	return bs
}

137 138
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
139
	// the wantlist tracks global wants for bitswap
140
	wm *bswm.WantManager
141

142 143 144
	// the provider query manager manages requests to find providers
	pqm *bspqm.ProviderQueryManager

145 146
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147

148 149
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
150 151 152 153 154

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

155 156 157
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
158
	newBlocks chan cid.Cid
159
	// provideKeys directly feeds provide workers
160
	provideKeys chan cid.Cid
161

162 163 164
	process process.Process

	// Counters for various statistics
165 166
	counterLk sync.Mutex
	counters  *counters
167 168

	// Metrics interface metrics
169 170 171
	dupMetric     metrics.Histogram
	allMetric     metrics.Histogram
	sentHistogram metrics.Histogram
Jeromy's avatar
Jeromy committed
172

173 174
	// the sessionmanager manages tracking sessions
	sm *bssm.SessionManager
175 176
}

177 178 179 180 181 182 183 184 185 186
type counters struct {
	blocksRecvd    uint64
	dupBlocksRecvd uint64
	dupDataRecvd   uint64
	blocksSent     uint64
	dataSent       uint64
	dataRecvd      uint64
	messagesRecvd  uint64
}

187
// GetBlock attempts to retrieve a particular block from peers within the
188
// deadline enforced by the context.
189
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
190
	return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
191 192
}

193 194
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
	var out []cid.Cid
195
	for _, e := range bs.engine.WantlistForPeer(p) {
196
		out = append(out, e.Cid)
197 198 199 200
	}
	return out
}

201 202 203 204
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

205 206 207 208 209 210 211
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
212
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
213 214
	session := bs.sm.NewSession(ctx)
	return session.GetBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
215 216
}

Łukasz Magiera's avatar
Łukasz Magiera committed
217
// HasBlock announces the existence of a block to this bitswap service. The
218
// service will potentially notify its peers.
219
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
220 221 222 223 224 225 226 227
	return bs.receiveBlockFrom(blk, "")
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
228 229 230 231 232
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
233

234
	err := bs.blockstore.Put(blk)
235 236
	if err != nil {
		log.Errorf("Error writing block to datastore: %s", err)
237 238
		return err
	}
239

240 241 242 243 244
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
245

246
	bs.sm.ReceiveBlockFrom(from, blk)
247

248 249
	bs.engine.AddBlock(blk)

250 251 252 253 254 255 256
	if ProvideEnabled {
		select {
		case bs.newBlocks <- blk.Cid():
			// send block off to be reprovided
		case <-bs.process.Closing():
			return bs.process.Close()
		}
257 258
	}
	return nil
259 260
}

261
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Steven Allen's avatar
Steven Allen committed
262 263 264
	bs.counterLk.Lock()
	bs.counters.messagesRecvd++
	bs.counterLk.Unlock()
Jeromy's avatar
Jeromy committed
265

Jeromy's avatar
Jeromy committed
266 267
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
268
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
269 270
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
271

272 273 274
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
275 276 277
		return
	}

Jeromy's avatar
Jeromy committed
278 279
	wg := sync.WaitGroup{}
	for _, block := range iblocks {
280

Jeromy's avatar
Jeromy committed
281
		wg.Add(1)
282
		go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine...
Jeromy's avatar
Jeromy committed
283
			defer wg.Done()
284

285
			bs.updateReceiveCounters(b)
286
			bs.sm.UpdateReceiveCounters(b)
287
			log.Debugf("got block %s from %s", b, p)
288

289
			// skip received blocks that are not in the wantlist
290
			if !bs.wm.IsWanted(b.Cid()) {
291 292 293
				return
			}

294 295
			if err := bs.receiveBlockFrom(b, p); err != nil {
				log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
Jeromy's avatar
Jeromy committed
296
			}
297
			log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
Jeromy's avatar
Jeromy committed
298
		}(block)
299
	}
Jeromy's avatar
Jeromy committed
300
	wg.Wait()
301 302
}

303 304
var ErrAlreadyHaveBlock = errors.New("already have block")

305
func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {
306
	blkLen := len(b.RawData())
307
	has, err := bs.blockstore.Has(b.Cid())
308 309
	if err != nil {
		log.Infof("blockstore.Has error: %s", err)
310
		return
311
	}
312 313 314

	bs.allMetric.Observe(float64(blkLen))
	if has {
315
		bs.dupMetric.Observe(float64(blkLen))
316 317
	}

318 319
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()
320
	c := bs.counters
321

322 323
	c.blocksRecvd++
	c.dataRecvd += uint64(len(b.RawData()))
324
	if has {
325 326
		c.dupBlocksRecvd++
		c.dupDataRecvd += uint64(blkLen)
327 328 329
	}
}

330
// Connected/Disconnected warns bitswap about peer connections.
331
func (bs *Bitswap) PeerConnected(p peer.ID) {
332
	bs.wm.Connected(p)
333
	bs.engine.PeerConnected(p)
334 335
}

336
// Connected/Disconnected warns bitswap about peer connections.
337
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
338
	bs.wm.Disconnected(p)
339
	bs.engine.PeerDisconnected(p)
340 341
}

342
func (bs *Bitswap) ReceiveError(err error) {
343
	log.Infof("Bitswap ReceiveError: %s", err)
344 345
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
346 347
}

348
func (bs *Bitswap) Close() error {
349
	return bs.process.Close()
350
}
351

352
func (bs *Bitswap) GetWantlist() []cid.Cid {
353
	entries := bs.wm.CurrentWants()
354
	out := make([]cid.Cid, 0, len(entries))
355
	for _, e := range entries {
356
		out = append(out, e.Cid)
357 358 359
	}
	return out
}
360 361 362 363

func (bs *Bitswap) IsOnline() bool {
	return true
}
364 365 366 367

func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
	return bs.sm.NewSession(ctx)
}