wantmanager.go 9.11 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12

13
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
14
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
15
	peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19
	// sync channels for Run loop
20 21 22
	incoming     chan *wantSet
	connectEvent chan peerStatus     // notification channel for peers connecting/disconnecting
	peerReqs     chan chan []peer.ID // channel to request connected peers on
23

Jeromy's avatar
Jeromy committed
24
	// synchronized by Run loop, only touch inside there
25
	peers map[peer.ID]*msgQueue
26
	wl    *wantlist.ThreadSafe
Jeromy's avatar
Jeromy committed
27
	bcwl  *wantlist.ThreadSafe
28

29
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
30
	ctx     context.Context
Jeromy's avatar
Jeromy committed
31
	cancel  func()
32

33 34
	wantlistGauge metrics.Gauge
	sentHistogram metrics.Histogram
35 36
}

37 38 39 40 41
type peerStatus struct {
	connect bool
	peer    peer.ID
}

42
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
43
	ctx, cancel := context.WithCancel(ctx)
44
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
45
		"Number of items in wantlist.").Gauge()
46 47
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)
48
	return &WantManager{
49
		incoming:      make(chan *wantSet, 10),
50
		connectEvent:  make(chan peerStatus, 10),
51 52 53
		peerReqs:      make(chan chan []peer.ID),
		peers:         make(map[peer.ID]*msgQueue),
		wl:            wantlist.NewThreadSafe(),
Jeromy's avatar
Jeromy committed
54
		bcwl:          wantlist.NewThreadSafe(),
55 56 57 58 59
		network:       network,
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
		sentHistogram: sentHistogram,
60 61 62 63 64 65
	}
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
66 67 68
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
69
	wl      *wantlist.ThreadSafe
70

Jeromy's avatar
Jeromy committed
71 72
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
73 74
	refcnt int

75 76 77 78
	work chan struct{}
	done chan struct{}
}

Jeromy's avatar
Jeromy committed
79
// WantBlocks adds the given cids to the wantlist, tracked by the given session
Jeromy's avatar
Jeromy committed
80
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
Jeromy's avatar
Jeromy committed
81
	log.Infof("want blocks: %s", ks)
Jeromy's avatar
Jeromy committed
82
	pm.addEntries(ctx, ks, peers, false, ses)
83 84
}

Jeromy's avatar
Jeromy committed
85
// CancelWants removes the given cids from the wantlist, tracked by the given session
Jeromy's avatar
Jeromy committed
86 87
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
	pm.addEntries(context.Background(), ks, peers, true, ses)
88 89 90 91 92
}

type wantSet struct {
	entries []*bsmsg.Entry
	targets []peer.ID
Jeromy's avatar
Jeromy committed
93
	from    uint64
94 95
}

Jeromy's avatar
Jeromy committed
96
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
97 98 99 100
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
101
			Entry:  wantlist.NewRefEntry(k, kMaxPriority-i),
102 103
		})
	}
104
	select {
Jeromy's avatar
Jeromy committed
105
	case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
106
	case <-pm.ctx.Done():
107
	case <-ctx.Done():
108
	}
109 110
}

111 112 113 114 115 116
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

117
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
118 119 120 121
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

122 123
	pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

124
	msg := bsmsg.New(false)
125
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
126
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
127
	err := pm.network.SendMessage(ctx, env.Peer, msg)
128
	if err != nil {
rht's avatar
rht committed
129
		log.Infof("sendblock error: %s", err)
130 131 132
	}
}

133
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
134
	mq, ok := pm.peers[p]
135
	if ok {
Jeromy's avatar
Jeromy committed
136
		mq.refcnt++
Jeromy's avatar
Jeromy committed
137
		return nil
138 139
	}

Jeromy's avatar
Jeromy committed
140
	mq = pm.newMsgQueue(p)
141 142

	// new peer, we will want to give them our full wantlist
143
	fullwantlist := bsmsg.New(true)
Jeromy's avatar
Jeromy committed
144 145 146 147
	for _, e := range pm.bcwl.Entries() {
		for k := range e.SesTrk {
			mq.wl.AddEntry(e, k)
		}
148
		fullwantlist.AddEntry(e.Cid, e.Priority)
149 150 151
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
152 153

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
154
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
155
	return mq
156 157
}

158
func (pm *WantManager) stopPeerHandler(p peer.ID) {
159 160 161 162 163 164
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
165 166 167 168 169
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

170 171 172 173
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
174
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
175 176 177 178 179
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
180 181 182
	for {
		select {
		case <-mq.work: // there is work to be done
183
			mq.doWork(ctx)
184 185
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
186 187
		case <-ctx.Done():
			return
188 189 190 191
		}
	}
}

192 193 194 195 196
func (mq *msgQueue) doWork(ctx context.Context) {
	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
197
		mq.outlk.Unlock()
198 199
		return
	}
Jeromy's avatar
Jeromy committed
200 201
	mq.out = nil
	mq.outlk.Unlock()
202

203 204 205 206 207 208 209 210 211 212
	// NB: only open a stream if we actually have data to send
	if mq.sender == nil {
		err := mq.openSender(ctx)
		if err != nil {
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
			// TODO: cant connect, what now?
			return
		}
	}

213
	// send wantlist updates
214
	for { // try to send this message until we fail.
215
		err := mq.sender.SendMsg(ctx, wlm)
216 217 218 219
		if err == nil {
			return
		}

rht's avatar
rht committed
220
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
221 222
		mq.sender.Close()
		mq.sender = nil
223 224 225 226 227 228 229 230 231 232 233 234 235

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
236
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
268
	}
269 270 271

	mq.sender = nsender
	return nil
272 273
}

274
func (pm *WantManager) Connected(p peer.ID) {
275
	select {
276
	case pm.connectEvent <- peerStatus{peer: p, connect: true}:
277 278
	case <-pm.ctx.Done():
	}
279 280
}

281
func (pm *WantManager) Disconnected(p peer.ID) {
282
	select {
283
	case pm.connectEvent <- peerStatus{peer: p, connect: false}:
284 285
	case <-pm.ctx.Done():
	}
286 287 288
}

// TODO: use goprocess here once i trust it
289
func (pm *WantManager) Run() {
290 291
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
292 293
	for {
		select {
294
		case ws := <-pm.incoming:
295

Jeromy's avatar
Jeromy committed
296 297 298
			// is this a broadcast or not?
			brdc := len(ws.targets) == 0

299
			// add changes to our wantlist
300
			for _, e := range ws.entries {
301
				if e.Cancel {
Jeromy's avatar
Jeromy committed
302 303 304 305
					if brdc {
						pm.bcwl.Remove(e.Cid, ws.from)
					}

Jeromy's avatar
Jeromy committed
306
					if pm.wl.Remove(e.Cid, ws.from) {
307
						pm.wantlistGauge.Dec()
308
					}
309
				} else {
Jeromy's avatar
Jeromy committed
310 311 312
					if brdc {
						pm.bcwl.AddEntry(e.Entry, ws.from)
					}
Jeromy's avatar
Jeromy committed
313
					if pm.wl.AddEntry(e.Entry, ws.from) {
314
						pm.wantlistGauge.Inc()
315
					}
316 317 318
				}
			}

319
			// broadcast those wantlist changes
320 321
			if len(ws.targets) == 0 {
				for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
322
					p.addMessage(ws.entries, ws.from)
323 324 325 326 327 328 329 330
				}
			} else {
				for _, t := range ws.targets {
					p, ok := pm.peers[t]
					if !ok {
						log.Warning("tried sending wantlist change to non-partner peer")
						continue
					}
Jeromy's avatar
Jeromy committed
331
					p.addMessage(ws.entries, ws.from)
332
				}
333 334
			}

335 336 337 338 339 340
		case p := <-pm.connectEvent:
			if p.connect {
				pm.startPeerHandler(p.peer)
			} else {
				pm.stopPeerHandler(p.peer)
			}
341 342 343 344 345 346
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
347
		case <-pm.ctx.Done():
348 349 350 351 352
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
353
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
354 355 356
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
Jeromy's avatar
Jeromy committed
357
		wl:      wantlist.NewThreadSafe(),
358 359 360 361
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
362 363
}

Jeromy's avatar
Jeromy committed
364
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) {
365
	var work bool
Jeromy's avatar
Jeromy committed
366
	mq.outlk.Lock()
367
	defer func() {
Jeromy's avatar
Jeromy committed
368
		mq.outlk.Unlock()
369 370 371
		if !work {
			return
		}
372 373 374 375 376 377
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

378
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
379
	if mq.out == nil {
380
		mq.out = bsmsg.New(false)
381 382 383
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
384 385
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
386
	for _, e := range entries {
387
		if e.Cancel {
Jeromy's avatar
Jeromy committed
388
			if mq.wl.Remove(e.Cid, ses) {
389 390 391
				work = true
				mq.out.Cancel(e.Cid)
			}
392
		} else {
Jeromy's avatar
Jeromy committed
393
			if mq.wl.Add(e.Cid, e.Priority, ses) {
394 395 396
				work = true
				mq.out.AddEntry(e.Cid, e.Priority)
			}
397 398 399
		}
	}
}