wantmanager.go 9 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12 13

	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
14 15
	cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
	peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19
	// sync channels for Run loop
20
	incoming   chan *wantSet
21 22 23
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
24

Jeromy's avatar
Jeromy committed
25
	// synchronized by Run loop, only touch inside there
26
	peers map[peer.ID]*msgQueue
27
	wl    *wantlist.ThreadSafe
Jeromy's avatar
Jeromy committed
28
	bcwl  *wantlist.ThreadSafe
29

30
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
31
	ctx     context.Context
Jeromy's avatar
Jeromy committed
32
	cancel  func()
33

34 35
	wantlistGauge metrics.Gauge
	sentHistogram metrics.Histogram
36 37
}

38
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
39
	ctx, cancel := context.WithCancel(ctx)
40
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
41
		"Number of items in wantlist.").Gauge()
42 43
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)
44
	return &WantManager{
45
		incoming:      make(chan *wantSet, 10),
46 47 48 49 50
		connect:       make(chan peer.ID, 10),
		disconnect:    make(chan peer.ID, 10),
		peerReqs:      make(chan chan []peer.ID),
		peers:         make(map[peer.ID]*msgQueue),
		wl:            wantlist.NewThreadSafe(),
Jeromy's avatar
Jeromy committed
51
		bcwl:          wantlist.NewThreadSafe(),
52 53 54 55 56
		network:       network,
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
		sentHistogram: sentHistogram,
57 58 59 60 61 62
	}
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
63 64 65
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
66
	wl      *wantlist.ThreadSafe
67

Jeromy's avatar
Jeromy committed
68 69
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
70 71
	refcnt int

72 73 74 75
	work chan struct{}
	done chan struct{}
}

Jeromy's avatar
Jeromy committed
76
// WantBlocks adds the given cids to the wantlist, tracked by the given session
Jeromy's avatar
Jeromy committed
77
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
Jeromy's avatar
Jeromy committed
78
	log.Infof("want blocks: %s", ks)
Jeromy's avatar
Jeromy committed
79
	pm.addEntries(ctx, ks, peers, false, ses)
80 81
}

Jeromy's avatar
Jeromy committed
82
// CancelWants removes the given cids from the wantlist, tracked by the given session
Jeromy's avatar
Jeromy committed
83 84
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
	pm.addEntries(context.Background(), ks, peers, true, ses)
85 86 87 88 89
}

type wantSet struct {
	entries []*bsmsg.Entry
	targets []peer.ID
Jeromy's avatar
Jeromy committed
90
	from    uint64
91 92
}

Jeromy's avatar
Jeromy committed
93
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
94 95 96 97
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
98
			Entry:  wantlist.NewRefEntry(k, kMaxPriority-i),
99 100
		})
	}
101
	select {
Jeromy's avatar
Jeromy committed
102
	case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
103
	case <-pm.ctx.Done():
104
	case <-ctx.Done():
105
	}
106 107
}

108 109 110 111 112 113
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

114
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
115 116 117 118
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

119 120
	pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

121
	msg := bsmsg.New(false)
122
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
123
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
124
	err := pm.network.SendMessage(ctx, env.Peer, msg)
125
	if err != nil {
rht's avatar
rht committed
126
		log.Infof("sendblock error: %s", err)
127 128 129
	}
}

130
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
131
	mq, ok := pm.peers[p]
132
	if ok {
Jeromy's avatar
Jeromy committed
133
		mq.refcnt++
Jeromy's avatar
Jeromy committed
134
		return nil
135 136
	}

Jeromy's avatar
Jeromy committed
137
	mq = pm.newMsgQueue(p)
138 139

	// new peer, we will want to give them our full wantlist
140
	fullwantlist := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
141 142 143 144
	for _, e := range pm.bcwl.Entries() {
		for k := range e.SesTrk {
			mq.wl.AddEntry(e, k)
		}
145
		fullwantlist.AddEntry(e.Cid, e.Priority)
146 147 148
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
149 150

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
151
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
152
	return mq
153 154
}

155
func (pm *WantManager) stopPeerHandler(p peer.ID) {
156 157 158 159 160 161
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
162 163 164 165 166
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

167 168 169 170
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
171
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
172 173 174 175 176
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
177 178 179
	for {
		select {
		case <-mq.work: // there is work to be done
180
			mq.doWork(ctx)
181 182
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
183 184
		case <-ctx.Done():
			return
185 186 187 188
		}
	}
}

189 190 191 192 193
func (mq *msgQueue) doWork(ctx context.Context) {
	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
194
		mq.outlk.Unlock()
195 196
		return
	}
Jeromy's avatar
Jeromy committed
197 198
	mq.out = nil
	mq.outlk.Unlock()
199

200 201 202 203 204 205 206 207 208 209
	// NB: only open a stream if we actually have data to send
	if mq.sender == nil {
		err := mq.openSender(ctx)
		if err != nil {
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
			// TODO: cant connect, what now?
			return
		}
	}

210
	// send wantlist updates
211
	for { // try to send this message until we fail.
212
		err := mq.sender.SendMsg(ctx, wlm)
213 214 215 216
		if err == nil {
			return
		}

rht's avatar
rht committed
217
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
218 219
		mq.sender.Close()
		mq.sender = nil
220 221 222 223 224 225 226 227 228 229 230 231 232

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
233
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
265
	}
266 267 268

	mq.sender = nsender
	return nil
269 270
}

271
func (pm *WantManager) Connected(p peer.ID) {
272 273 274 275
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
276 277
}

278
func (pm *WantManager) Disconnected(p peer.ID) {
279 280 281 282
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
283 284 285
}

// TODO: use goprocess here once i trust it
286
func (pm *WantManager) Run() {
287
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
288
	defer tock.Stop()
289 290
	for {
		select {
291
		case ws := <-pm.incoming:
292

Jeromy's avatar
Jeromy committed
293 294 295
			// is this a broadcast or not?
			brdc := len(ws.targets) == 0

296
			// add changes to our wantlist
297
			for _, e := range ws.entries {
298
				if e.Cancel {
Jeromy's avatar
Jeromy committed
299 300 301 302
					if brdc {
						pm.bcwl.Remove(e.Cid, ws.from)
					}

Jeromy's avatar
Jeromy committed
303
					if pm.wl.Remove(e.Cid, ws.from) {
304
						pm.wantlistGauge.Dec()
305
					}
306
				} else {
Jeromy's avatar
Jeromy committed
307 308 309
					if brdc {
						pm.bcwl.AddEntry(e.Entry, ws.from)
					}
Jeromy's avatar
Jeromy committed
310
					if pm.wl.AddEntry(e.Entry, ws.from) {
311
						pm.wantlistGauge.Inc()
312
					}
313 314 315
				}
			}

316
			// broadcast those wantlist changes
317 318
			if len(ws.targets) == 0 {
				for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
319
					p.addMessage(ws.entries, ws.from)
320 321 322 323 324 325 326 327
				}
			} else {
				for _, t := range ws.targets {
					p, ok := pm.peers[t]
					if !ok {
						log.Warning("tried sending wantlist change to non-partner peer")
						continue
					}
Jeromy's avatar
Jeromy committed
328
					p.addMessage(ws.entries, ws.from)
329
				}
330 331 332
			}

		case p := <-pm.connect:
333
			pm.startPeerHandler(p)
334 335
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
336 337 338 339 340 341
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
342
		case <-pm.ctx.Done():
343 344 345 346 347
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
348
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
349 350 351
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
352
		wl:      wantlist.NewThreadSafe(),
353 354 355 356
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
357 358
}

Jeromy's avatar
Jeromy committed
359
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) {
360
	var work bool
Jeromy's avatar
Jeromy committed
361
	mq.outlk.Lock()
362
	defer func() {
Jeromy's avatar
Jeromy committed
363
		mq.outlk.Unlock()
364 365 366
		if !work {
			return
		}
367 368 369 370 371 372
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

373
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
374
	if mq.out == nil {
375
		mq.out = bsmsg.New(false)
376 377 378
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
379 380
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
381
	for _, e := range entries {
382
		if e.Cancel {
Jeromy's avatar
Jeromy committed
383
			if mq.wl.Remove(e.Cid, ses) {
384 385 386
				work = true
				mq.out.Cancel(e.Cid)
			}
387
		} else {
Jeromy's avatar
Jeromy committed
388
			if mq.wl.Add(e.Cid, e.Priority, ses) {
389 390 391
				work = true
				mq.out.AddEntry(e.Cid, e.Priority)
			}
392 393 394
		}
	}
}