bitswap.go 12 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"math"
7
	"sync"
Jeromy's avatar
Jeromy committed
8 9
	"time"

10
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
11
	inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
12
	process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
13

14
	blocks "github.com/jbenet/go-ipfs/blocks"
15
	blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
16
	exchange "github.com/jbenet/go-ipfs/exchange"
17
	decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision"
18 19 20
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
21
	wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
	peer "github.com/jbenet/go-ipfs/p2p/peer"
23 24
	"github.com/jbenet/go-ipfs/thirdparty/delay"
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
25
	u "github.com/jbenet/go-ipfs/util"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
26 27
	errors "github.com/jbenet/go-ipfs/util/debugerror"
	pset "github.com/jbenet/go-ipfs/util/peerset" // TODO move this to peerstore
28 29
)

30
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32
const (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33 34 35
	// maxProvidersPerRequest specifies the maximum number of providers desired
	// from the network. This value is specified because the network streams
	// results.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
36 37 38 39
	// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
	maxProvidersPerRequest = 3
	providerRequestTimeout = time.Second * 10
	hasBlockTimeout        = time.Second * 15
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
	sizeBatchRequestChan   = 32
41 42
	// kMaxPriority is the max priority as defined by the bitswap protocol
	kMaxPriority = math.MaxInt32
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43
)
Jeromy's avatar
Jeromy committed
44

Brian Tiger Chow's avatar
Brian Tiger Chow committed
45
var (
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
	rebroadcastDelay = delay.Fixed(time.Second * 10)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
)
48

Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50 51 52
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate.
// Runs until context is cancelled.
53
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
54
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
55

56 57 58 59 60 61 62
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
	// coupled to the concerns of the IPFS daemon in this way.
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
63 64
	ctx, cancelFunc := context.WithCancel(parent)

65
	notif := notifications.New()
66 67 68 69 70
	px := process.WithTeardown(func() error {
		notif.Shutdown()
		return nil
	})

71
	go func() {
72
		<-px.Closing() // process closes first
Jeromy's avatar
Jeromy committed
73
		cancelFunc()
74 75 76 77
	}()
	go func() {
		<-ctx.Done() // parent cancelled first
		px.Close()
78 79
	}()

80
	bs := &bitswap{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
		self:          p,
82
		blockstore:    bstore,
83
		notifications: notif,
84
		engine:        decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
85
		network:       network,
86
		wantlist:      wantlist.NewThreadSafe(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
		batchRequests: make(chan []u.Key, sizeBatchRequestChan),
88
		process:       px,
89
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
90
	network.SetDelegate(bs)
91 92 93 94 95 96
	px.Go(func(px process.Process) {
		bs.clientWorker(ctx)
	})
	px.Go(func(px process.Process) {
		bs.taskWorker(ctx)
	})
97 98 99 100

	return bs
}

101 102 103
// bitswap instances implement the bitswap protocol.
type bitswap struct {

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105 106
	// the ID of the peer to act on behalf of
	self peer.ID

107 108
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
109 110 111 112 113 114 115

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	notifications notifications.PubSub

116 117 118 119
	// Requests for a set of related blocks
	// the assumption is made that the same peer is likely to
	// have more than a single block in the set
	batchRequests chan []u.Key
Jeromy's avatar
Jeromy committed
120

121
	engine *decision.Engine
122

123
	wantlist *wantlist.ThreadSafe
124

125
	process process.Process
126 127
}

128
// GetBlock attempts to retrieve a particular block from peers within the
129
// deadline enforced by the context.
Jeromy's avatar
Jeromy committed
130
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
131

132 133 134 135
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
136 137
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
138

139
	ctx, cancelFunc := context.WithCancel(parent)
140

Jeromy's avatar
Jeromy committed
141
	ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
Jeromy's avatar
Jeromy committed
142
	defer log.EventBegin(ctx, "GetBlockRequest", &k).Done()
143 144 145 146

	defer func() {
		cancelFunc()
	}()
147

148
	promise, err := bs.GetBlocks(ctx, []u.Key{k})
149 150
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
151
	}
152 153

	select {
154
	case block := <-promise:
Jeromy's avatar
Jeromy committed
155
		return block, nil
156 157
	case <-parent.Done():
		return nil, parent.Err()
158
	}
159

160 161
}

162 163 164 165 166 167 168
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
Jeromy's avatar
Jeromy committed
169
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
170

171 172 173 174 175
	select {
	case <-bs.process.Closing():
		return nil, errors.New("bitswap is closed")
	default:
	}
176 177 178
	promise := bs.notifications.Subscribe(ctx, keys...)
	select {
	case bs.batchRequests <- keys:
179
		return promise, nil
180 181 182
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
183 184
}

185 186 187
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
188 189 190 191 192
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
193 194 195 196 197
	if err := bs.blockstore.Put(blk); err != nil {
		return err
	}
	bs.wantlist.Remove(blk.Key())
	bs.notifications.Publish(blk)
198
	return bs.network.Provide(ctx, blk.Key())
199 200
}

201 202
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
	set := pset.New()
203
	wg := sync.WaitGroup{}
204
	for peerToQuery := range peers {
205 206 207 208 209

		if !set.TryAdd(peerToQuery) { //Do once per peer
			continue
		}

210
		wg.Add(1)
211
		go func(p peer.ID) {
212
			defer wg.Done()
213 214 215
			if err := bs.send(ctx, p, m); err != nil {
				log.Error(err) // TODO remove if too verbose
			}
216
		}(peerToQuery)
Jeromy's avatar
Jeromy committed
217
	}
218
	wg.Wait()
Jeromy's avatar
Jeromy committed
219 220 221
	return nil
}

222
func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
Jeromy's avatar
Jeromy committed
223 224
	message := bsmsg.New()
	message.SetFull(true)
225 226
	for _, wanted := range bs.wantlist.Entries() {
		message.AddEntry(wanted.Key, wanted.Priority)
Jeromy's avatar
Jeromy committed
227
	}
228 229
	return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
Jeromy's avatar
Jeromy committed
230

Brian Tiger Chow's avatar
Brian Tiger Chow committed
231
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantlist.Entry) {
Jeromy's avatar
Jeromy committed
232

233 234 235 236 237
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// prepare a channel to hand off to sendWantlistToPeers
	sendToPeers := make(chan peer.ID)
Jeromy's avatar
Jeromy committed
238

239
	// Get providers for all entries in wantlist (could take a while)
Jeromy's avatar
Jeromy committed
240
	wg := sync.WaitGroup{}
Jeromy's avatar
Jeromy committed
241
	for _, e := range entries {
242
		wg.Add(1)
Jeromy's avatar
Jeromy committed
243
		go func(k u.Key) {
Jeromy's avatar
Jeromy committed
244
			defer wg.Done()
245

246
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
247
			providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
248
			for prov := range providers {
249
				sendToPeers <- prov
Jeromy's avatar
Jeromy committed
250
			}
251
		}(e.Key)
Jeromy's avatar
Jeromy committed
252
	}
253 254 255 256 257 258 259 260

	go func() {
		wg.Wait() // make sure all our children do finish.
		close(sendToPeers)
	}()

	err := bs.sendWantlistToPeers(ctx, sendToPeers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261
		log.Errorf("sendWantlistToPeers error: %s", err)
262
	}
Jeromy's avatar
Jeromy committed
263 264
}

265
// TODO(brian): handle errors
266 267
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) (
	peer.ID, bsmsg.BitSwapMessage) {
268
	defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
269

270
	if p == "" {
271
		log.Error("Received message from nil peer!")
272
		// TODO propagate the error upward
273
		return "", nil
274 275
	}
	if incoming == nil {
276
		log.Error("Got nil bitswap message!")
277
		// TODO propagate the error upward
278
		return "", nil
279
	}
280

Jeromy's avatar
Jeromy committed
281 282
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
283
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
284 285
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
286

Brian Tiger Chow's avatar
Brian Tiger Chow committed
287
	for _, block := range incoming.Blocks() {
288 289
		hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout)
		if err := bs.HasBlock(hasBlockCtx, block); err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
290
			log.Error(err)
Jeromy's avatar
Jeromy committed
291
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
292
	}
293
	var keys []u.Key
Brian Tiger Chow's avatar
Brian Tiger Chow committed
294
	for _, block := range incoming.Blocks() {
295
		keys = append(keys, block.Key())
296
	}
297
	bs.cancelBlocks(ctx, keys)
298

Jeromy's avatar
Jeromy committed
299
	// TODO: consider changing this function to not return anything
300
	return "", nil
301 302
}

303 304 305
// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerConnected(p peer.ID) {
	// TODO: add to clientWorker??
306 307 308 309 310 311 312
	peers := make(chan peer.ID, 1)
	peers <- p
	close(peers)
	err := bs.sendWantlistToPeers(context.TODO(), peers)
	if err != nil {
		log.Errorf("error sending wantlist: %s", err)
	}
313 314 315 316 317 318 319
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *bitswap) PeerDisconnected(peer.ID) {
	// TODO: release resources.
}

Jeromy's avatar
Jeromy committed
320
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
321 322 323
	if len(bkeys) < 1 {
		return
	}
Jeromy's avatar
Jeromy committed
324 325 326
	message := bsmsg.New()
	message.SetFull(false)
	for _, k := range bkeys {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
327
		message.Cancel(k)
Jeromy's avatar
Jeromy committed
328
	}
329
	for _, p := range bs.engine.Peers() {
Jeromy's avatar
Jeromy committed
330 331 332 333 334 335 336
		err := bs.send(ctx, p, message)
		if err != nil {
			log.Errorf("Error sending message: %s", err)
		}
	}
}

337
func (bs *bitswap) ReceiveError(err error) {
338
	log.Errorf("Bitswap ReceiveError: %s", err)
339 340
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
341 342
}

343 344
// send strives to ensure that accounting is always performed when a message is
// sent
345
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
346
	defer log.EventBegin(ctx, "sendMessage", p, m).Done()
347
	if err := bs.network.SendMessage(ctx, p, m); err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
348
		return errors.Wrap(err)
349
	}
350
	return bs.engine.MessageSent(p, m)
351
}
352 353

func (bs *bitswap) Close() error {
354
	return bs.process.Close()
355
}
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422

func (bs *bitswap) taskWorker(ctx context.Context) {
	defer log.Info("bitswap task worker shutting down...")
	for {
		select {
		case <-ctx.Done():
			return
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case <-ctx.Done():
				return
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
				log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
				bs.send(ctx, envelope.Peer, envelope.Message)
			}
		}
	}
}

// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {

	defer log.Info("bitswap client worker shutting down...")

	ctx, cancel := context.WithCancel(parent)

	broadcastSignal := time.After(rebroadcastDelay.Get())
	defer cancel()

	for {
		select {
		case <-time.Tick(10 * time.Second):
			n := bs.wantlist.Len()
			if n > 0 {
				log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
			}
		case <-broadcastSignal: // resend unfulfilled wantlist keys
			entries := bs.wantlist.Entries()
			if len(entries) > 0 {
				bs.sendWantlistToProviders(ctx, entries)
			}
			broadcastSignal = time.After(rebroadcastDelay.Get())
		case keys := <-bs.batchRequests:
			if len(keys) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
			for i, k := range keys {
				bs.wantlist.Add(k, kMaxPriority-i)
			}
			// NB: Optimization. Assumes that providers of key[0] are likely to
			// be able to provide for all keys. This currently holds true in most
			// every situation. Later, this assumption may not hold as true.
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
			providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
			err := bs.sendWantlistToPeers(ctx, providers)
			if err != nil {
				log.Errorf("error sending wantlist: %s", err)
			}
		case <-parent.Done():
			return
		}
	}
}