engine.go 11.7 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
Jeromy's avatar
Jeromy committed
11 12 13
	bsmsg "github.com/ipfs/go-bitswap/message"
	wl "github.com/ipfs/go-bitswap/wantlist"
	blocks "github.com/ipfs/go-block-format"
14
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
15 16
	bstore "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
17 18
	"github.com/ipfs/go-peertaskqueue"
	"github.com/ipfs/go-peertaskqueue/peertask"
Raúl Kripalani's avatar
Raúl Kripalani committed
19
	peer "github.com/libp2p/go-libp2p-core/peer"
20 21
)

22 23 24 25 26 27 28 29
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
30 31
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
32 33 34 35 36 37 38 39 40 41 42 43
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
44 45
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
46
// * when handling `cancel`, if we recently received a wanted block from a
47
//   peer, include a partial wantlist that contains a few other high priority
48 49 50 51 52 53
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
54
var log = logging.Logger("engine")
55

Brian Tiger Chow's avatar
Brian Tiger Chow committed
56
const (
57 58
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
59 60
	// maxMessageSize is the maximum size of the batched payload
	maxMessageSize = 512 * 1024
61 62 63 64 65
	// tagPrefix is the tag given to peers associated an engine
	tagPrefix = "bs-engine-%s"

	// tagWeight is the default weight for peers associated with an engine
	tagWeight = 5
Brian Tiger Chow's avatar
Brian Tiger Chow committed
66 67
)

68
// Envelope contains a message for a Peer.
69
type Envelope struct {
70
	// Peer is the intended recipient.
71
	Peer peer.ID
72

73
	// Message is the payload.
74
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
75 76 77

	// A callback to notify the decision queue that the task is complete
	Sent func()
78 79
}

80 81 82 83 84 85 86
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

87
// Engine manages sending requested blocks to peers.
88
type Engine struct {
89 90 91
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
92
	peerRequestQueue *peertaskqueue.PeerTaskQueue
93

94 95 96 97 98
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
99
	workSignal chan struct{}
100

101 102
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
103
	outbox chan (<-chan *Envelope)
104 105 106

	bs bstore.Blockstore

107 108 109
	peerTagger PeerTagger

	tag  string
110
	lock sync.Mutex // protects the fields immediatly below
111
	// ledgerMap lists Ledgers by their Partner key.
112
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
113 114

	ticker *time.Ticker
115 116
}

117
// NewEngine creates a new block sending engine for the given block store
118
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger) *Engine {
119
	e := &Engine{
120 121 122 123 124 125
		ledgerMap:  make(map[peer.ID]*ledger),
		bs:         bs,
		peerTagger: peerTagger,
		outbox:     make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal: make(chan struct{}, 1),
		ticker:     time.NewTicker(time.Millisecond * 100),
126
	}
127 128
	e.tag = fmt.Sprintf(tagPrefix, uuid.New().String())
	e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
129 130
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
131 132
}

133 134 135 136 137 138 139 140
func (e *Engine) onPeerAdded(p peer.ID) {
	e.peerTagger.TagPeer(p, e.tag, tagWeight)
}

func (e *Engine) onPeerRemoved(p peer.ID) {
	e.peerTagger.UntagPeer(p, e.tag)
}

141
// WantlistForPeer returns the currently understood want list for a given peer
142
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
143 144 145 146
	partner := e.findOrCreate(p)
	partner.lk.Lock()
	defer partner.lk.Unlock()
	return partner.wantList.SortedEntries()
147 148
}

149 150
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
	ledger := e.findOrCreate(p)

	ledger.lk.Lock()
	defer ledger.lk.Unlock()

	return &Receipt{
		Peer:      ledger.Partner.String(),
		Value:     ledger.Accounting.Value(),
		Sent:      ledger.Accounting.BytesSent,
		Recv:      ledger.Accounting.BytesRecv,
		Exchanged: ledger.ExchangeCount(),
	}
}

166
func (e *Engine) taskWorker(ctx context.Context) {
167 168
	defer close(e.outbox) // because taskWorker uses the channel exclusively
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
169
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
170 171 172 173 174 175 176 177 178 179 180 181
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
182
		oneTimeUse <- envelope // buffered. won't block
183 184 185 186 187 188 189
		close(oneTimeUse)
	}
}

// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
190
	for {
191
		nextTask := e.peerRequestQueue.PopBlock()
192
		for nextTask == nil {
Jeromy's avatar
Jeromy committed
193
			select {
194
			case <-ctx.Done():
195
				return nil, ctx.Err()
196
			case <-e.workSignal:
197
				nextTask = e.peerRequestQueue.PopBlock()
Jeromy's avatar
Jeromy committed
198
			case <-e.ticker.C:
199 200
				e.peerRequestQueue.ThawRound()
				nextTask = e.peerRequestQueue.PopBlock()
Jeromy's avatar
Jeromy committed
201 202
			}
		}
203 204

		// with a task in hand, we're ready to prepare the envelope...
205
		msg := bsmsg.New(true)
206 207
		for _, entry := range nextTask.Tasks {
			block, err := e.bs.Get(entry.Identifier.(cid.Cid))
208 209 210 211 212 213
			if err != nil {
				log.Errorf("tried to execute a task and errored fetching block: %s", err)
				continue
			}
			msg.AddBlock(block)
		}
214

215
		if msg.Empty() {
Jeromy's avatar
Jeromy committed
216 217
			// If we don't have the block, don't hold that against the peer
			// make sure to update that the task has been 'completed'
218
			nextTask.Done(nextTask.Tasks)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
219
			continue
220
		}
221

222
		return &Envelope{
223 224
			Peer:    nextTask.Target,
			Message: msg,
225
			Sent: func() {
226
				nextTask.Done(nextTask.Tasks)
227 228 229 230 231 232 233
				select {
				case e.workSignal <- struct{}{}:
					// work completing may mean that our queue will provide new
					// work to be done.
				default:
				}
			},
234
		}, nil
Jeromy's avatar
Jeromy committed
235 236 237
	}
}

238
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
239
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
240
	return e.outbox
241 242
}

243
// Peers returns a slice of Peers with whom the local node has active sessions.
244
func (e *Engine) Peers() []peer.ID {
245 246
	e.lock.Lock()
	defer e.lock.Unlock()
247

248 249
	response := make([]peer.ID, 0, len(e.ledgerMap))

250
	for _, ledger := range e.ledgerMap {
251 252 253 254 255 256 257
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
258
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
259
	if m.Empty() {
260
		log.Debugf("received empty message from %s", p)
261 262
	}

263 264 265
	newWorkExists := false
	defer func() {
		if newWorkExists {
266
			e.signalNewWork()
267 268
		}
	}()
269

270
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
271 272
	l.lk.Lock()
	defer l.lk.Unlock()
273 274 275
	if m.Full() {
		l.wantList = wl.New()
	}
276

277
	var msgSize int
278
	var activeEntries []peertask.Task
279 280
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
281 282 283
			log.Debugf("%s cancel %s", p, entry.Cid)
			l.CancelWant(entry.Cid)
			e.peerRequestQueue.Remove(entry.Cid, p)
284
		} else {
285 286
			log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
			l.Wants(entry.Cid, entry.Priority)
287 288 289 290 291 292 293
			blockSize, err := e.bs.GetSize(entry.Cid)
			if err != nil {
				if err == bstore.ErrNotFound {
					continue
				}
				log.Error(err)
			} else {
294
				// we have the block
Brian Tiger Chow's avatar
Brian Tiger Chow committed
295
				newWorkExists = true
296
				if msgSize+blockSize > maxMessageSize {
297 298
					e.peerRequestQueue.PushBlock(p, activeEntries...)
					activeEntries = []peertask.Task{}
299 300
					msgSize = 0
				}
301
				activeEntries = append(activeEntries, peertask.Task{Identifier: entry.Cid, Priority: entry.Priority})
302
				msgSize += blockSize
303
			}
304 305
		}
	}
306
	if len(activeEntries) > 0 {
307
		e.peerRequestQueue.PushBlock(p, activeEntries...)
308
	}
309
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
310 311
		log.Debugf("got block %s %d bytes", block, len(block.RawData()))
		l.ReceivedBytes(len(block.RawData()))
312 313 314
	}
}

315
func (e *Engine) addBlocks(blocks []blocks.Block) {
316 317 318
	work := false

	for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
319
		l.lk.Lock()
320 321 322 323 324 325 326 327
		for _, block := range blocks {
			if entry, ok := l.WantListContains(block.Cid()); ok {
				e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
					Identifier: entry.Cid,
					Priority:   entry.Priority,
				})
				work = true
			}
328
		}
Jeromy's avatar
Jeromy committed
329
		l.lk.Unlock()
330 331 332 333 334 335 336
	}

	if work {
		e.signalNewWork()
	}
}

337 338 339 340
// AddBlocks is called when new blocks are received and added to a block store,
// meaning there may be peers who want those blocks, so we should send the blocks
// to them.
func (e *Engine) AddBlocks(blocks []blocks.Block) {
341 342 343
	e.lock.Lock()
	defer e.lock.Unlock()

344
	e.addBlocks(blocks)
345 346
}

347 348 349 350 351 352
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

353 354
// MessageSent is called when a message has successfully been sent out, to record
// changes.
355
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
356
	l := e.findOrCreate(p)
357 358 359
	l.lk.Lock()
	defer l.lk.Unlock()

360
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
361
		l.SentBytes(len(block.RawData()))
362 363
		l.wantList.Remove(block.Cid())
		e.peerRequestQueue.Remove(block.Cid(), p)
364 365 366 367
	}

}

368 369
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
370 371
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
372
	defer e.lock.Unlock()
373 374 375 376 377 378
	l, ok := e.ledgerMap[p]
	if !ok {
		l = newLedger(p)
		e.ledgerMap[p] = l
	}
	l.lk.Lock()
379
	defer l.lk.Unlock()
380 381 382
	l.ref++
}

383
// PeerDisconnected is called when a peer disconnects.
384
func (e *Engine) PeerDisconnected(p peer.ID) {
385 386 387 388 389 390 391
	e.lock.Lock()
	defer e.lock.Unlock()
	l, ok := e.ledgerMap[p]
	if !ok {
		return
	}
	l.lk.Lock()
392
	defer l.lk.Unlock()
393 394 395 396
	l.ref--
	if l.ref <= 0 {
		delete(e.ledgerMap, p)
	}
397 398
}

399
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
400
	// NB not threadsafe
401
	return e.findOrCreate(p).Accounting.BytesSent
402 403
}

404
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
405
	// NB not threadsafe
406
	return e.findOrCreate(p).Accounting.BytesRecv
407 408 409
}

// ledger lazily instantiates a ledger
410
func (e *Engine) findOrCreate(p peer.ID) *ledger {
Jeromy's avatar
Jeromy committed
411
	e.lock.Lock()
412
	defer e.lock.Unlock()
413
	l, ok := e.ledgerMap[p]
414 415
	if !ok {
		l = newLedger(p)
416
		e.ledgerMap[p] = l
417 418 419
	}
	return l
}
420 421 422 423 424 425 426 427

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}