wantmanager.go 9.15 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12

13
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
14
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
15
	peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19
	// sync channels for Run loop
20 21 22
	incoming     chan *wantSet
	connectEvent chan peerStatus     // notification channel for peers connecting/disconnecting
	peerReqs     chan chan []peer.ID // channel to request connected peers on
23

Jeromy's avatar
Jeromy committed
24
	// synchronized by Run loop, only touch inside there
25
	peers map[peer.ID]*msgQueue
26
	wl    *wantlist.ThreadSafe
Jeromy's avatar
Jeromy committed
27
	bcwl  *wantlist.ThreadSafe
28

29
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
30
	ctx     context.Context
Jeromy's avatar
Jeromy committed
31
	cancel  func()
32

33 34
	wantlistGauge metrics.Gauge
	sentHistogram metrics.Histogram
35 36
}

37 38 39 40 41
type peerStatus struct {
	connect bool
	peer    peer.ID
}

42
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
43
	ctx, cancel := context.WithCancel(ctx)
44
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
45
		"Number of items in wantlist.").Gauge()
46 47
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)
48
	return &WantManager{
49
		incoming:      make(chan *wantSet, 10),
50
		connectEvent:  make(chan peerStatus, 10),
51 52 53
		peerReqs:      make(chan chan []peer.ID),
		peers:         make(map[peer.ID]*msgQueue),
		wl:            wantlist.NewThreadSafe(),
Jeromy's avatar
Jeromy committed
54
		bcwl:          wantlist.NewThreadSafe(),
55 56 57 58 59
		network:       network,
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
		sentHistogram: sentHistogram,
60 61 62 63 64 65
	}
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
66 67 68
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
69
	wl      *wantlist.ThreadSafe
70

Jeromy's avatar
Jeromy committed
71 72
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
73 74
	refcnt int

75 76 77 78
	work chan struct{}
	done chan struct{}
}

Jeromy's avatar
Jeromy committed
79
// WantBlocks adds the given cids to the wantlist, tracked by the given session
Jeromy's avatar
Jeromy committed
80
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
Jeromy's avatar
Jeromy committed
81
	log.Infof("want blocks: %s", ks)
Jeromy's avatar
Jeromy committed
82
	pm.addEntries(ctx, ks, peers, false, ses)
83 84
}

Jeromy's avatar
Jeromy committed
85
// CancelWants removes the given cids from the wantlist, tracked by the given session
Jeromy's avatar
Jeromy committed
86 87
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
	pm.addEntries(context.Background(), ks, peers, true, ses)
88 89 90 91 92
}

type wantSet struct {
	entries []*bsmsg.Entry
	targets []peer.ID
Jeromy's avatar
Jeromy committed
93
	from    uint64
94 95
}

Jeromy's avatar
Jeromy committed
96
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
97 98 99 100
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
101
			Entry:  wantlist.NewRefEntry(k, kMaxPriority-i),
102 103
		})
	}
104
	select {
Jeromy's avatar
Jeromy committed
105
	case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
106
	case <-pm.ctx.Done():
107
	case <-ctx.Done():
108
	}
109 110
}

111 112 113 114 115 116
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

117
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
118 119 120 121
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

122 123
	pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

124
	msg := bsmsg.New(false)
125
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
126
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
127
	err := pm.network.SendMessage(ctx, env.Peer, msg)
128
	if err != nil {
rht's avatar
rht committed
129
		log.Infof("sendblock error: %s", err)
130 131 132
	}
}

133
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
134
	mq, ok := pm.peers[p]
135
	if ok {
Jeromy's avatar
Jeromy committed
136
		mq.refcnt++
Jeromy's avatar
Jeromy committed
137
		return nil
138 139
	}

Jeromy's avatar
Jeromy committed
140
	mq = pm.newMsgQueue(p)
141 142

	// new peer, we will want to give them our full wantlist
143
	fullwantlist := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
144 145 146 147
	for _, e := range pm.bcwl.Entries() {
		for k := range e.SesTrk {
			mq.wl.AddEntry(e, k)
		}
148
		fullwantlist.AddEntry(e.Cid, e.Priority)
149 150 151
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
152 153

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
154
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
155
	return mq
156 157
}

158
func (pm *WantManager) stopPeerHandler(p peer.ID) {
159 160 161 162 163 164
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
165 166 167 168 169
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

170 171 172 173
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
174
func (mq *msgQueue) runQueue(ctx context.Context) {
175 176 177
	for {
		select {
		case <-mq.work: // there is work to be done
178
			mq.doWork(ctx)
179
		case <-mq.done:
180 181 182
			if mq.sender != nil {
				mq.sender.Close()
			}
183
			return
Jeromy's avatar
Jeromy committed
184
		case <-ctx.Done():
185 186 187
			if mq.sender != nil {
				mq.sender.Reset()
			}
Jeromy's avatar
Jeromy committed
188
			return
189 190 191 192
		}
	}
}

193 194 195 196 197
func (mq *msgQueue) doWork(ctx context.Context) {
	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
198
		mq.outlk.Unlock()
199 200
		return
	}
Jeromy's avatar
Jeromy committed
201 202
	mq.out = nil
	mq.outlk.Unlock()
203

204 205 206 207 208 209 210 211 212 213
	// NB: only open a stream if we actually have data to send
	if mq.sender == nil {
		err := mq.openSender(ctx)
		if err != nil {
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
			// TODO: cant connect, what now?
			return
		}
	}

214
	// send wantlist updates
215
	for { // try to send this message until we fail.
216
		err := mq.sender.SendMsg(ctx, wlm)
217 218 219 220
		if err == nil {
			return
		}

rht's avatar
rht committed
221
		log.Infof("bitswap send error: %s", err)
222
		mq.sender.Reset()
Jeromy's avatar
Jeromy committed
223
		mq.sender = nil
224 225 226 227 228 229 230 231 232 233 234 235 236

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
237
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
269
	}
270 271 272

	mq.sender = nsender
	return nil
273 274
}

275
func (pm *WantManager) Connected(p peer.ID) {
276
	select {
277
	case pm.connectEvent <- peerStatus{peer: p, connect: true}:
278 279
	case <-pm.ctx.Done():
	}
280 281
}

282
func (pm *WantManager) Disconnected(p peer.ID) {
283
	select {
284
	case pm.connectEvent <- peerStatus{peer: p, connect: false}:
285 286
	case <-pm.ctx.Done():
	}
287 288 289
}

// TODO: use goprocess here once i trust it
290
func (pm *WantManager) Run() {
291 292
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
293 294
	for {
		select {
295
		case ws := <-pm.incoming:
296

Jeromy's avatar
Jeromy committed
297 298 299
			// is this a broadcast or not?
			brdc := len(ws.targets) == 0

300
			// add changes to our wantlist
301
			for _, e := range ws.entries {
302
				if e.Cancel {
Jeromy's avatar
Jeromy committed
303 304 305 306
					if brdc {
						pm.bcwl.Remove(e.Cid, ws.from)
					}

Jeromy's avatar
Jeromy committed
307
					if pm.wl.Remove(e.Cid, ws.from) {
308
						pm.wantlistGauge.Dec()
309
					}
310
				} else {
Jeromy's avatar
Jeromy committed
311 312 313
					if brdc {
						pm.bcwl.AddEntry(e.Entry, ws.from)
					}
Jeromy's avatar
Jeromy committed
314
					if pm.wl.AddEntry(e.Entry, ws.from) {
315
						pm.wantlistGauge.Inc()
316
					}
317 318 319
				}
			}

320
			// broadcast those wantlist changes
321 322
			if len(ws.targets) == 0 {
				for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
323
					p.addMessage(ws.entries, ws.from)
324 325 326 327 328 329 330 331
				}
			} else {
				for _, t := range ws.targets {
					p, ok := pm.peers[t]
					if !ok {
						log.Warning("tried sending wantlist change to non-partner peer")
						continue
					}
Jeromy's avatar
Jeromy committed
332
					p.addMessage(ws.entries, ws.from)
333
				}
334 335
			}

336 337 338 339 340 341
		case p := <-pm.connectEvent:
			if p.connect {
				pm.startPeerHandler(p.peer)
			} else {
				pm.stopPeerHandler(p.peer)
			}
342 343 344 345 346 347
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
348
		case <-pm.ctx.Done():
349 350 351 352 353
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
354
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
355 356 357
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
358
		wl:      wantlist.NewThreadSafe(),
359 360 361 362
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
363 364
}

Jeromy's avatar
Jeromy committed
365
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) {
366
	var work bool
Jeromy's avatar
Jeromy committed
367
	mq.outlk.Lock()
368
	defer func() {
Jeromy's avatar
Jeromy committed
369
		mq.outlk.Unlock()
370 371 372
		if !work {
			return
		}
373 374 375 376 377 378
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

379
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
380
	if mq.out == nil {
381
		mq.out = bsmsg.New(false)
382 383 384
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
385 386
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
387
	for _, e := range entries {
388
		if e.Cancel {
Jeromy's avatar
Jeromy committed
389
			if mq.wl.Remove(e.Cid, ses) {
390 391 392
				work = true
				mq.out.Cancel(e.Cid)
			}
393
		} else {
Jeromy's avatar
Jeromy committed
394
			if mq.wl.Add(e.Cid, e.Priority, ses) {
395 396 397
				work = true
				mq.out.AddEntry(e.Cid, e.Priority)
			}
398 399 400
		}
	}
}