bitswap.go 12.2 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"math"
7
	"sync"
Jeromy's avatar
Jeromy committed
8 9
	"time"

10
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
11
	inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
12
	process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
13

14
	blocks "github.com/jbenet/go-ipfs/blocks"
15
	blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
16
	exchange "github.com/jbenet/go-ipfs/exchange"
17
	decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision"
18 19 20
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
21
	wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
	peer "github.com/jbenet/go-ipfs/p2p/peer"
23 24
	"github.com/jbenet/go-ipfs/thirdparty/delay"
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
25
	u "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
26 27
	errors "github.com/jbenet/go-ipfs/util/debugerror"
	pset "github.com/jbenet/go-ipfs/util/peerset" // TODO move this to peerstore
28 29
)

30
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
36 37 38 39
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	sizeBatchRequestChan   = 32
41 42
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43
)
Jeromy's avatar
Jeromy committed
44

Brian Tiger Chow's avatar
Brian Tiger Chow committed
45
var (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
	rebroadcastDelay = delay.Fixed(time.Second * 10)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
)
48

Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
53
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
54
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
55

56 57 58 59 60 61 62
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
63 64
	ctx, cancelFunc := context.WithCancel(parent)

65
	notif := notifications.New()
66 67 68 69 70
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

71
	go func() {
72
		<-px.Closing() // process closes first
Jeromy's avatar
Jeromy committed
73
		cancelFunc()
74 75 76 77
	}()
	go func() {
		<-ctx.Done() // parent cancelled first
		px.Close()
78 79
	}()

80
	bs := &bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		self:          p,
82
		blockstore:    bstore,
83
		notifications: notif,
84
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
85
		network:       network,
86
		wantlist:      wantlist.NewThreadSafe(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
		batchRequests: make(chan []u.Key, sizeBatchRequestChan),
88
		process:       px,
89
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90
	network.SetDelegate(bs)
91 92 93 94 95 96
	px.Go(func(px process.Process) {
		bs.clientWorker(ctx)
	})
	px.Go(func(px process.Process) {
		bs.taskWorker(ctx)
	})
97 98 99 100

	return bs
}

101 102 103
// bitswap instances implement the bitswap protocol.
type bitswap struct {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106
	// the ID of the peer to act on behalf of
	self peer.ID

107 108
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
109 110 111 112 113 114 115

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

116 117 118 119
	// Requests for a set of related blocks
	// the assumption is made that the same peer is likely to
	// have more than a single block in the set
	batchRequests chan []u.Key
Jeromy's avatar
Jeromy committed
120

121
	engine *decision.Engine
122

123
	wantlist *wantlist.ThreadSafe
124

125
	process process.Process
126 127
}

128
// GetBlock attempts to retrieve a particular block from peers within the
129
// deadline enforced by the context.
Jeromy's avatar
Jeromy committed
130
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
131

132 133 134 135
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
136 137
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
138

139
	ctx, cancelFunc := context.WithCancel(parent)
140

Jeromy's avatar
Jeromy committed
141
	ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
Jeromy's avatar
Jeromy committed
142
	defer log.EventBegin(ctx, "GetBlockRequest", &k).Done()
143 144 145 146

	defer func() {
		cancelFunc()
	}()
147

148
	promise, err := bs.GetBlocks(ctx, []u.Key{k})
149 150
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
151
	}
152 153

	select {
154 155 156 157 158 159 160 161 162
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
Jeromy's avatar
Jeromy committed
163
		return block, nil
164 165
	case <-parent.Done():
		return nil, parent.Err()
166
	}
167

168 169
}

170 171 172 173 174 175 176
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
Jeromy's avatar
Jeromy committed
177
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
178

179 180 181 182 183
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
184 185 186
	promise := bs.notifications.Subscribe(ctx, keys...)
	select {
	case bs.batchRequests <- keys:
187
		return promise, nil
188 189 190
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
191 192
}

193 194 195
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
196 197 198 199 200
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
201 202 203 204 205
	if err := bs.blockstore.Put(blk); err != nil {
		return err
	}
	bs.wantlist.Remove(blk.Key())
	bs.notifications.Publish(blk)
206
	return bs.network.Provide(ctx, blk.Key())
207 208
}

209 210
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
	set := pset.New()
211
	wg := sync.WaitGroup{}
212
	for peerToQuery := range peers {
213 214 215 216 217

		if !set.TryAdd(peerToQuery) { //Do once per peer
			continue
		}

218
		wg.Add(1)
219
		go func(p peer.ID) {
220
			defer wg.Done()
221
			if err := bs.send(ctx, p, m); err != nil {
222
				log.Debug(err) // TODO remove if too verbose
223
			}
224
		}(peerToQuery)
Jeromy's avatar
Jeromy committed
225
	}
226
	wg.Wait()
Jeromy's avatar
Jeromy committed
227 228 229
	return nil
}

230
func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
Jeromy's avatar
Jeromy committed
231 232
	message := bsmsg.New()
	message.SetFull(true)
233 234
	for _, wanted := range bs.wantlist.Entries() {
		message.AddEntry(wanted.Key, wanted.Priority)
Jeromy's avatar
Jeromy committed
235
	}
236 237
	return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
Jeromy's avatar
Jeromy committed
238

Brian Tiger Chow's avatar
Brian Tiger Chow committed
239
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
Jeromy's avatar
Jeromy committed
240

241 242 243 244 245
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// prepare a channel to hand off to sendWantlistToPeers
	sendToPeers := make(chan peer.ID)
Jeromy's avatar
Jeromy committed
246

247
	// Get providers for all entries in wantlist (could take a while)
Jeromy's avatar
Jeromy committed
248
	wg := sync.WaitGroup{}
Jeromy's avatar
Jeromy committed
249
	for _, e := range entries {
250
		wg.Add(1)
Jeromy's avatar
Jeromy committed
251
		go func(k u.Key) {
Jeromy's avatar
Jeromy committed
252
			defer wg.Done()
253

254
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
255
			providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
256
			for prov := range providers {
257
				sendToPeers <- prov
Jeromy's avatar
Jeromy committed
258
			}
259
		}(e.Key)
Jeromy's avatar
Jeromy committed
260
	}
261 262 263 264 265 266 267 268

	go func() {
		wg.Wait() // make sure all our children do finish.
		close(sendToPeers)
	}()

	err := bs.sendWantlistToPeers(ctx, sendToPeers)
	if err != nil {
269
		log.Debugf("sendWantlistToPeers error: %s", err)
270
	}
Jeromy's avatar
Jeromy committed
271 272
}

273
// TODO(brian): handle errors
274 275
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
	peer.ID, bsmsg.BitSwapMessage) {
276
	defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
277

278
	if p == "" {
279
		log.Debug("Received message from nil peer!")
280
		// TODO propagate the error upward
281
		return "", nil
282 283
	}
	if incoming == nil {
284
		log.Debug("Got nil bitswap message!")
285
		// TODO propagate the error upward
286
		return "", nil
287
	}
288

Jeromy's avatar
Jeromy committed
289 290
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
291
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
292 293
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
294

Brian Tiger Chow's avatar
Brian Tiger Chow committed
295
	for _, block := range incoming.Blocks() {
296 297
		hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout)
		if err := bs.HasBlock(hasBlockCtx, block); err != nil {
298
			log.Debug(err)
Jeromy's avatar
Jeromy committed
299
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
300
	}
301
	var keys []u.Key
Brian Tiger Chow's avatar
Brian Tiger Chow committed
302
	for _, block := range incoming.Blocks() {
303
		keys = append(keys, block.Key())
304
	}
305
	bs.cancelBlocks(ctx, keys)
306

Jeromy's avatar
Jeromy committed
307
	// TODO: consider changing this function to not return anything
308
	return "", nil
309 310
}

311 312 313
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerConnected(p peer.ID) {
	// TODO: add to clientWorker??
314 315 316 317 318
	peers := make(chan peer.ID, 1)
	peers <- p
	close(peers)
	err := bs.sendWantlistToPeers(context.TODO(), peers)
	if err != nil {
319
		log.Debugf("error sending wantlist: %s", err)
320
	}
321 322 323 324 325 326 327
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerDisconnected(peer.ID) {
	// TODO: release resources.
}

Jeromy's avatar
Jeromy committed
328
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
329 330 331
	if len(bkeys) < 1 {
		return
	}
Jeromy's avatar
Jeromy committed
332 333 334
	message := bsmsg.New()
	message.SetFull(false)
	for _, k := range bkeys {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
335
		message.Cancel(k)
Jeromy's avatar
Jeromy committed
336
	}
337
	for _, p := range bs.engine.Peers() {
Jeromy's avatar
Jeromy committed
338 339
		err := bs.send(ctx, p, message)
		if err != nil {
340
			log.Debugf("Error sending message: %s", err)
Jeromy's avatar
Jeromy committed
341 342 343 344
		}
	}
}

345
func (bs *bitswap) ReceiveError(err error) {
346
	log.Debugf("Bitswap ReceiveError: %s", err)
347 348
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
349 350
}

351 352
// send strives to ensure that accounting is always performed when a message is
// sent
353
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
354
	defer log.EventBegin(ctx, "sendMessage", p, m).Done()
355
	if err := bs.network.SendMessage(ctx, p, m); err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
356
		return errors.Wrap(err)
357
	}
358
	return bs.engine.MessageSent(p, m)
359
}
360 361

func (bs *bitswap) Close() error {
362
	return bs.process.Close()
363
}
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423

func (bs *bitswap) taskWorker(ctx context.Context) {
	defer log.Info("bitswap task worker shutting down...")
	for {
		select {
		case <-ctx.Done():
			return
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case <-ctx.Done():
				return
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
				log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
				bs.send(ctx, envelope.Peer, envelope.Message)
			}
		}
	}
}

// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {

	defer log.Info("bitswap client worker shutting down...")

	ctx, cancel := context.WithCancel(parent)

	broadcastSignal := time.After(rebroadcastDelay.Get())
	defer cancel()

	for {
		select {
		case <-time.Tick(10 * time.Second):
			n := bs.wantlist.Len()
			if n > 0 {
				log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
			}
		case <-broadcastSignal: // resend unfulfilled wantlist keys
			entries := bs.wantlist.Entries()
			if len(entries) > 0 {
				bs.sendWantlistToProviders(ctx, entries)
			}
			broadcastSignal = time.After(rebroadcastDelay.Get())
		case keys := <-bs.batchRequests:
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
			for i, k := range keys {
				bs.wantlist.Add(k, kMaxPriority-i)
			}
			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
			err := bs.sendWantlistToPeers(ctx, providers)
			if err != nil {
424
				log.Debugf("error sending wantlist: %s", err)
425 426 427 428 429 430
			}
		case <-parent.Done():
			return
		}
	}
}