engine.go 9.48 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
// package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"sync"
Jeromy's avatar
Jeromy committed
7
	"time"
8

9 10
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
11

12 13 14 15
	blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format"
	peer "gx/ipfs/QmVf8hTAsLLFtn4WPCRNdnaF2Eag2qTBS6uR8AiHPZARXy/go-libp2p-peer"
	bstore "gx/ipfs/QmbaPGg81pvQiC5vTXtC9Jo8rdrWUjRaugH71WYNsgi6Ev/go-ipfs-blockstore"
	logging "gx/ipfs/Qmbi1CTJsbnBZjCEgc2otwu8cUFPsGpzWXG7edVCLZ7Gvk/go-log"
16 17
)

18 19 20 21 22 23 24 25
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
26 27
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
28 29 30 31 32 33 34 35 36 37 38 39
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
40 41
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
42
// * when handling `cancel`, if we recently received a wanted block from a
43
//   peer, include a partial wantlist that contains a few other high priority
44 45 46 47 48 49
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
50
var log = logging.Logger("engine")
51

Brian Tiger Chow's avatar
Brian Tiger Chow committed
52
const (
53 54
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
55 56
)

57
// Envelope contains a message for a Peer
58
type Envelope struct {
59
	// Peer is the intended recipient
60
	Peer peer.ID
61 62

	// Block is the payload
63
	Block blocks.Block
Jeromy's avatar
Jeromy committed
64 65 66

	// A callback to notify the decision queue that the task is complete
	Sent func()
67 68
}

69
type Engine struct {
70 71 72
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
Jeromy's avatar
Jeromy committed
73
	peerRequestQueue *prq
74

75 76 77 78 79
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
80
	workSignal chan struct{}
81

82 83
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84
	outbox chan (<-chan *Envelope)
85 86 87

	bs bstore.Blockstore

88
	lock sync.Mutex // protects the fields immediatly below
89
	// ledgerMap lists Ledgers by their Partner key.
90
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
91 92

	ticker *time.Ticker
93 94
}

95 96
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
97
		ledgerMap:        make(map[peer.ID]*ledger),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
		bs:               bs,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
99
		peerRequestQueue: newPRQ(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
100
		outbox:           make(chan (<-chan *Envelope), outboxChanBuffer),
101
		workSignal:       make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
102
		ticker:           time.NewTicker(time.Millisecond * 100),
103
	}
104 105
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
106 107
}

108
func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
109 110 111 112
	partner := e.findOrCreate(p)
	partner.lk.Lock()
	defer partner.lk.Unlock()
	return partner.wantList.SortedEntries()
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
	ledger := e.findOrCreate(p)

	ledger.lk.Lock()
	defer ledger.lk.Unlock()

	return &Receipt{
		Peer:      ledger.Partner.String(),
		Value:     ledger.Accounting.Value(),
		Sent:      ledger.Accounting.BytesSent,
		Recv:      ledger.Accounting.BytesRecv,
		Exchanged: ledger.ExchangeCount(),
	}
}

130
func (e *Engine) taskWorker(ctx context.Context) {
131 132
	defer close(e.outbox) // because taskWorker uses the channel exclusively
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
133
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
134 135 136 137 138 139 140 141 142 143 144 145
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
146
		oneTimeUse <- envelope // buffered. won't block
147 148 149 150 151 152 153
		close(oneTimeUse)
	}
}

// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
154
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
155
		nextTask := e.peerRequestQueue.Pop()
156
		for nextTask == nil {
Jeromy's avatar
Jeromy committed
157
			select {
158
			case <-ctx.Done():
159
				return nil, ctx.Err()
160
			case <-e.workSignal:
161
				nextTask = e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
162 163 164
			case <-e.ticker.C:
				e.peerRequestQueue.thawRound()
				nextTask = e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
165 166
			}
		}
167 168

		// with a task in hand, we're ready to prepare the envelope...
169

170
		block, err := e.bs.Get(nextTask.Entry.Cid)
171
		if err != nil {
172
			log.Errorf("tried to execute a task and errored fetching block: %s", err)
Jeromy's avatar
Jeromy committed
173 174 175
			// If we don't have the block, don't hold that against the peer
			// make sure to update that the task has been 'completed'
			nextTask.Done()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
176
			continue
177
		}
178

179
		return &Envelope{
180 181
			Peer:  nextTask.Target,
			Block: block,
182 183 184 185 186 187 188 189 190
			Sent: func() {
				nextTask.Done()
				select {
				case e.workSignal <- struct{}{}:
					// work completing may mean that our queue will provide new
					// work to be done.
				default:
				}
			},
191
		}, nil
Jeromy's avatar
Jeromy committed
192 193 194
	}
}

195
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
196
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
197
	return e.outbox
198 199 200
}

// Returns a slice of Peers with whom the local node has active sessions
201
func (e *Engine) Peers() []peer.ID {
202 203
	e.lock.Lock()
	defer e.lock.Unlock()
204

205 206
	response := make([]peer.ID, 0, len(e.ledgerMap))

207
	for _, ledger := range e.ledgerMap {
208 209 210 211 212 213 214
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
215
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
216
	if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
217
		log.Debugf("received empty message from %s", p)
218 219
	}

220 221 222
	newWorkExists := false
	defer func() {
		if newWorkExists {
223
			e.signalNewWork()
224 225
		}
	}()
226

227
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
228 229
	l.lk.Lock()
	defer l.lk.Unlock()
230 231 232
	if m.Full() {
		l.wantList = wl.New()
	}
233

234 235
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
236 237 238
			log.Debugf("%s cancel %s", p, entry.Cid)
			l.CancelWant(entry.Cid)
			e.peerRequestQueue.Remove(entry.Cid, p)
239
		} else {
240 241 242
			log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
			l.Wants(entry.Cid, entry.Priority)
			if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
243
				e.peerRequestQueue.Push(entry.Entry, p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
244
				newWorkExists = true
245
			}
246 247
		}
	}
Jeromy's avatar
Jeromy committed
248

249
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
250 251
		log.Debugf("got block %s %d bytes", block, len(block.RawData()))
		l.ReceivedBytes(len(block.RawData()))
252 253 254 255
	}
	return nil
}

256
func (e *Engine) addBlock(block blocks.Block) {
257 258 259
	work := false

	for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
260
		l.lk.Lock()
261
		if entry, ok := l.WantListContains(block.Cid()); ok {
262 263 264
			e.peerRequestQueue.Push(entry, l.Partner)
			work = true
		}
Jeromy's avatar
Jeromy committed
265
		l.lk.Unlock()
266 267 268 269 270 271 272
	}

	if work {
		e.signalNewWork()
	}
}

273
func (e *Engine) AddBlock(block blocks.Block) {
274 275 276 277 278 279
	e.lock.Lock()
	defer e.lock.Unlock()

	e.addBlock(block)
}

280 281 282 283 284 285
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

286
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
287
	l := e.findOrCreate(p)
288 289 290
	l.lk.Lock()
	defer l.lk.Unlock()

291
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
292
		l.SentBytes(len(block.RawData()))
293 294
		l.wantList.Remove(block.Cid())
		e.peerRequestQueue.Remove(block.Cid(), p)
295 296 297 298 299
	}

	return nil
}

300 301
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
302
	defer e.lock.Unlock()
303 304 305 306 307 308
	l, ok := e.ledgerMap[p]
	if !ok {
		l = newLedger(p)
		e.ledgerMap[p] = l
	}
	l.lk.Lock()
309
	defer l.lk.Unlock()
310 311 312
	l.ref++
}

313
func (e *Engine) PeerDisconnected(p peer.ID) {
314 315 316 317 318 319 320
	e.lock.Lock()
	defer e.lock.Unlock()
	l, ok := e.ledgerMap[p]
	if !ok {
		return
	}
	l.lk.Lock()
321
	defer l.lk.Unlock()
322 323 324 325
	l.ref--
	if l.ref <= 0 {
		delete(e.ledgerMap, p)
	}
326 327
}

328
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
329
	// NB not threadsafe
330
	return e.findOrCreate(p).Accounting.BytesSent
331 332
}

333
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
334
	// NB not threadsafe
335
	return e.findOrCreate(p).Accounting.BytesRecv
336 337 338
}

// ledger lazily instantiates a ledger
339
func (e *Engine) findOrCreate(p peer.ID) *ledger {
Jeromy's avatar
Jeromy committed
340
	e.lock.Lock()
341
	defer e.lock.Unlock()
342
	l, ok := e.ledgerMap[p]
343 344
	if !ok {
		l = newLedger(p)
345
		e.ledgerMap[p] = l
346 347 348
	}
	return l
}
349 350 351 352 353 354 355 356

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}