bitswap.go 11.1 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2
// package bitswap implements the IPFS Exchange interface with the BitSwap
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"sync"
Jeromy's avatar
Jeromy committed
7 8
	"time"

9 10 11
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	blocks "github.com/jbenet/go-ipfs/blocks"
12
	blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
13 14 15 16 17
	exchange "github.com/jbenet/go-ipfs/exchange"
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
	notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
	strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
Jeromy's avatar
Jeromy committed
18
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
19 20
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
Jeromy's avatar
Jeromy committed
21
	eventlog "github.com/jbenet/go-ipfs/util/eventlog"
22 23
)

24
var log = eventlog.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

Jeromy's avatar
Jeromy committed
26
// Number of providers to request for sending a wantlist to
Jeromy's avatar
Jeromy committed
27 28
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
const maxProvidersPerRequest = 3
Jeromy's avatar
Jeromy committed
29

30 31 32
var providerRequestTimeout = time.Second * 10
var hasBlockTimeout = time.Second * 15
var rebroadcastDelay = time.Second * 10
33

Jeromy's avatar
Jeromy committed
34 35
const roundTime = time.Second / 2

36 37
var bandwidthPerRound = 500000

38 39 40
// New initializes a BitSwap instance that communicates over the
// provided BitSwapNetwork. This function registers the returned instance as
// the network delegate.
41
// Runs until context is cancelled
42
func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing,
43
	bstore blockstore.Blockstore, nice bool) exchange.Interface {
44

45 46
	ctx, cancelFunc := context.WithCancel(parent)

47 48
	notif := notifications.New()
	go func() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49
		<-ctx.Done()
Jeromy's avatar
Jeromy committed
50
		cancelFunc()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
		notif.Shutdown()
52 53
	}()

54
	bs := &bitswap{
55
		blockstore:    bstore,
56
		cancelFunc:    cancelFunc,
57
		notifications: notif,
58
		strategy:      strategy.New(nice),
59
		ledgerset:     strategy.NewLedgerSet(),
60
		routing:       routing,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
61
		sender:        network,
62
		wantlist:      wl.New(),
63
		batchRequests: make(chan []u.Key, 32),
64
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
	network.SetDelegate(bs)
66
	go bs.clientWorker(ctx)
Jeromy's avatar
Jeromy committed
67
	go bs.roundWorker(ctx)
68 69 70 71

	return bs
}

72 73 74 75
// bitswap instances implement the bitswap protocol.
type bitswap struct {

	// sender delivers messages on behalf of the session
76
	sender bsnet.BitSwapNetwork
77 78 79 80 81 82

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

	// routing interface for communication
83
	routing bsnet.Routing
84 85 86

	notifications notifications.PubSub

87 88 89 90
	// Requests for a set of related blocks
	// the assumption is made that the same peer is likely to
	// have more than a single block in the set
	batchRequests chan []u.Key
Jeromy's avatar
Jeromy committed
91

92
	// strategy makes decisions about how to interact with partners.
93
	strategy strategy.Strategy
94

95 96
	ledgerset *strategy.LedgerSet

Jeromy's avatar
Jeromy committed
97
	wantlist *wl.Wantlist
98 99 100

	// cancelFunc signals cancellation to the bitswap event loop
	cancelFunc func()
101 102
}

103
// GetBlock attempts to retrieve a particular block from peers within the
104
// deadline enforced by the context.
Jeromy's avatar
Jeromy committed
105
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
106

107 108 109 110
	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
111 112
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
113

114
	ctx, cancelFunc := context.WithCancel(parent)
115

Jeromy's avatar
Jeromy committed
116
	ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
117
	log.Event(ctx, "GetBlockRequestBegin", &k)
118 119 120 121 122

	defer func() {
		cancelFunc()
		log.Event(ctx, "GetBlockRequestEnd", &k)
	}()
123

124
	promise, err := bs.GetBlocks(ctx, []u.Key{k})
125 126
	if err != nil {
		return nil, err
Jeromy's avatar
Jeromy committed
127
	}
128 129

	select {
130
	case block := <-promise:
Jeromy's avatar
Jeromy committed
131
		return block, nil
132 133
	case <-parent.Done():
		return nil, parent.Err()
134
	}
135

136 137
}

138 139 140 141 142 143 144
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
Jeromy's avatar
Jeromy committed
145
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
146 147 148 149 150
	// TODO log the request

	promise := bs.notifications.Subscribe(ctx, keys...)
	select {
	case bs.batchRequests <- keys:
151
		return promise, nil
152 153 154
	case <-ctx.Done():
		return nil, ctx.Err()
	}
Jeromy's avatar
Jeromy committed
155 156
}

157 158 159 160 161 162 163 164 165 166 167 168
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
	if err := bs.blockstore.Put(blk); err != nil {
		return err
	}
	bs.wantlist.Remove(blk.Key())
	bs.notifications.Publish(blk)
	child, _ := context.WithTimeout(ctx, hasBlockTimeout)
	return bs.routing.Provide(child, blk.Key())
}

Jeromy's avatar
Jeromy committed
169
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
Jeromy's avatar
Jeromy committed
170 171 172
	if peers == nil {
		panic("Cant send wantlist to nil peerchan")
	}
Jeromy's avatar
Jeromy committed
173
	message := bsmsg.New()
Jeromy's avatar
Jeromy committed
174 175
	for _, wanted := range bs.wantlist.Entries() {
		message.AddEntry(wanted.Value, wanted.Priority, false)
Jeromy's avatar
Jeromy committed
176
	}
177
	wg := sync.WaitGroup{}
Jeromy's avatar
Jeromy committed
178
	for peerToQuery := range peers {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
179
		log.Event(ctx, "PeerToQuery", peerToQuery)
180
		wg.Add(1)
Jeromy's avatar
Jeromy committed
181
		go func(p peer.Peer) {
182
			defer wg.Done()
Jeromy's avatar
Jeromy committed
183

Brian Tiger Chow's avatar
Brian Tiger Chow committed
184
			log.Event(ctx, "DialPeer", p)
Jeromy's avatar
Jeromy committed
185 186
			err := bs.sender.DialPeer(ctx, p)
			if err != nil {
187
				log.Errorf("Error sender.DialPeer(%s): %s", p, err)
Jeromy's avatar
Jeromy committed
188 189 190
				return
			}

191
			err = bs.sender.SendMessage(ctx, p, message)
Jeromy's avatar
Jeromy committed
192
			if err != nil {
193
				log.Errorf("Error sender.SendMessage(%s) = %s", p, err)
Jeromy's avatar
Jeromy committed
194 195 196 197 198
				return
			}
			// FIXME ensure accounting is handled correctly when
			// communication fails. May require slightly different API to
			// get better guarantees. May need shared sequence numbers.
199
			bs.ledgerset.MessageSent(p, message)
Jeromy's avatar
Jeromy committed
200 201
		}(peerToQuery)
	}
202
	wg.Wait()
Jeromy's avatar
Jeromy committed
203 204 205
	return nil
}

Jeromy's avatar
Jeromy committed
206
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
207 208 209 210 211 212
	provset := make(map[u.Key]peer.Peer)
	provcollect := make(chan peer.Peer)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

213
	wg := sync.WaitGroup{}
214
	// Get providers for all entries in wantlist (could take a while)
Jeromy's avatar
Jeromy committed
215
	for _, e := range wantlist.Entries() {
216
		wg.Add(1)
Jeromy's avatar
Jeromy committed
217
		go func(k u.Key) {
218 219
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
			providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest)
Jeromy's avatar
Jeromy committed
220

221 222
			for prov := range providers {
				provcollect <- prov
Jeromy's avatar
Jeromy committed
223
			}
224
			wg.Done()
Jeromy's avatar
Jeromy committed
225
		}(e.Value)
Jeromy's avatar
Jeromy committed
226
	}
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

	// When all workers finish, close the providers channel
	go func() {
		wg.Wait()
		close(provcollect)
	}()

	// Filter out duplicates,
	// no need to send our wantlists out twice in a given time period
	for {
		select {
		case p, ok := <-provcollect:
			if !ok {
				break
			}
			provset[p.Key()] = p
		case <-ctx.Done():
			log.Error("Context cancelled before we got all the providers!")
			return
		}
	}

	message := bsmsg.New()
	message.SetFull(true)
	for _, e := range bs.wantlist.Entries() {
		message.AddEntry(e.Value, e.Priority, false)
	}

	for _, prov := range provset {
		bs.send(ctx, prov, message)
	}
Jeromy's avatar
Jeromy committed
258 259
}

Jeromy's avatar
Jeromy committed
260 261 262 263 264 265 266
func (bs *bitswap) roundWorker(ctx context.Context) {
	roundTicker := time.NewTicker(roundTime)
	for {
		select {
		case <-ctx.Done():
			return
		case <-roundTicker.C:
267
			alloc, err := bs.strategy.GetTasks(bandwidthPerRound, bs.ledgerset, bs.blockstore)
Jeromy's avatar
Jeromy committed
268 269 270
			if err != nil {
				log.Critical("%s", err)
			}
271 272 273 274
			err = bs.processStrategyAllocation(ctx, alloc)
			if err != nil {
				log.Critical("Error processing strategy allocation: %s", err)
			}
Jeromy's avatar
Jeromy committed
275 276 277 278
		}
	}
}

279
func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) error {
Jeromy's avatar
Jeromy committed
280 281 282 283 284
	for _, t := range alloc {
		for _, block := range t.Blocks {
			message := bsmsg.New()
			message.AddBlock(block)
			if err := bs.send(ctx, t.Peer, message); err != nil {
285
				return err
Jeromy's avatar
Jeromy committed
286 287 288
			}
		}
	}
289
	return nil
Jeromy's avatar
Jeromy committed
290 291
}

292
// TODO ensure only one active request per key
293
func (bs *bitswap) clientWorker(parent context.Context) {
294 295

	ctx, cancel := context.WithCancel(parent)
Jeromy's avatar
Jeromy committed
296

297 298
	broadcastSignal := time.After(rebroadcastDelay)
	defer cancel()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
299

Jeromy's avatar
Jeromy committed
300 301
	for {
		select {
302
		case <-broadcastSignal:
Jeromy's avatar
Jeromy committed
303
			// Resend unfulfilled wantlist keys
Jeromy's avatar
Jeromy committed
304
			bs.sendWantlistToProviders(ctx, bs.wantlist)
305
			broadcastSignal = time.After(rebroadcastDelay)
306
		case ks := <-bs.batchRequests:
307 308 309 310
			if len(ks) == 0 {
				log.Warning("Received batch request for zero blocks")
				continue
			}
311 312
			for i, k := range ks {
				bs.wantlist.Add(k, len(ks)-i)
313
			}
Jeromy's avatar
Jeromy committed
314 315 316 317 318 319 320
			// NB: send want list to providers for the first peer in this list.
			//		the assumption is made that the providers of the first key in
			//		the set are likely to have others as well.
			//		This currently holds true in most every situation, since when
			//		pinning a file, you store and provide all blocks associated with
			//		it. Later, this assumption may not hold as true if we implement
			//		newer bitswap strategies.
321 322
			child, _ := context.WithTimeout(ctx, providerRequestTimeout)
			providers := bs.routing.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
323 324 325 326

			err := bs.sendWantListTo(ctx, providers)
			if err != nil {
				log.Errorf("error sending wantlist: %s", err)
Jeromy's avatar
Jeromy committed
327
			}
328
		case <-parent.Done():
Jeromy's avatar
Jeromy committed
329 330 331 332 333
			return
		}
	}
}

334
// TODO(brian): handle errors
335 336
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
	peer.Peer, bsmsg.BitSwapMessage) {
Jeromy's avatar
Jeromy committed
337
	log.Debugf("ReceiveMessage from %s", p)
338

339
	if p == nil {
340
		log.Error("Received message from nil peer!")
341 342
		// TODO propagate the error upward
		return nil, nil
343 344
	}
	if incoming == nil {
345
		log.Error("Got nil bitswap message!")
346 347
		// TODO propagate the error upward
		return nil, nil
348
	}
349

Jeromy's avatar
Jeromy committed
350 351
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
352
	bs.ledgerset.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
353 354
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
355

Jeromy's avatar
Jeromy committed
356
	var blkeys []u.Key
Brian Tiger Chow's avatar
Brian Tiger Chow committed
357
	for _, block := range incoming.Blocks() {
Jeromy's avatar
Jeromy committed
358
		blkeys = append(blkeys, block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
359 360
		if err := bs.HasBlock(ctx, block); err != nil {
			log.Error(err)
Jeromy's avatar
Jeromy committed
361
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
362
	}
Jeromy's avatar
Jeromy committed
363 364
	if len(blkeys) > 0 {
		bs.cancelBlocks(ctx, blkeys)
365
	}
366

Jeromy's avatar
Jeromy committed
367
	// TODO: consider changing this function to not return anything
368
	return nil, nil
369 370
}

Jeromy's avatar
Jeromy committed
371 372 373 374 375 376
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
	message := bsmsg.New()
	message.SetFull(false)
	for _, k := range bkeys {
		message.AddEntry(k, 0, true)
	}
377
	for _, p := range bs.ledgerset.Peers() {
Jeromy's avatar
Jeromy committed
378 379 380 381 382 383 384
		err := bs.send(ctx, p, message)
		if err != nil {
			log.Errorf("Error sending message: %s", err)
		}
	}
}

385
func (bs *bitswap) ReceiveError(err error) {
386
	log.Errorf("Bitswap ReceiveError: %s", err)
387 388
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
389 390
}

391 392
// send strives to ensure that accounting is always performed when a message is
// sent
393 394 395 396
func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage) error {
	if err := bs.sender.SendMessage(ctx, p, m); err != nil {
		return err
	}
397
	return bs.ledgerset.MessageSent(p, m)
398
}
399 400 401 402 403

func (bs *bitswap) Close() error {
	bs.cancelFunc()
	return nil // to conform to Closer interface
}