wantmanager.go 8.32 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12 13

	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
14
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
15
	peer "gx/ipfs/QmWUswjn261LSyVxWAEpMVtPdy8zmKBJJfBpG3Qdpa8ZsE/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19 20
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
21 22 23
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
24

Jeromy's avatar
Jeromy committed
25
	// synchronized by Run loop, only touch inside there
26
	peers map[peer.ID]*msgQueue
27
	wl    *wantlist.ThreadSafe
28

29
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
30
	ctx     context.Context
Jeromy's avatar
Jeromy committed
31
	cancel  func()
32

33 34
	wantlistGauge metrics.Gauge
	sentHistogram metrics.Histogram
35 36
}

37
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
38
	ctx, cancel := context.WithCancel(ctx)
39
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
40
		"Number of items in wantlist.").Gauge()
41 42
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)
43
	return &WantManager{
44 45 46 47 48 49 50 51 52 53 54
		incoming:      make(chan []*bsmsg.Entry, 10),
		connect:       make(chan peer.ID, 10),
		disconnect:    make(chan peer.ID, 10),
		peerReqs:      make(chan chan []peer.ID),
		peers:         make(map[peer.ID]*msgQueue),
		wl:            wantlist.NewThreadSafe(),
		network:       network,
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
		sentHistogram: sentHistogram,
55 56 57 58 59 60 61 62 63 64
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
65
	blk *cid.Cid
66 67 68 69 70
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
71 72 73
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
74

Jeromy's avatar
Jeromy committed
75 76
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
77 78
	refcnt int

79 80 81 82
	work chan struct{}
	done chan struct{}
}

83
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
Jeromy's avatar
Jeromy committed
84
	log.Infof("want blocks: %s", ks)
85
	pm.addEntries(ctx, ks, false)
86 87
}

88
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
89
	log.Infof("cancel wants: %s", ks)
90
	pm.addEntries(context.TODO(), ks, true)
91 92
}

93
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
94 95 96 97
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
98
			Entry: &wantlist.Entry{
99
				Cid:      k,
100
				Priority: kMaxPriority - i,
101
				RefCnt:   1,
102 103 104
			},
		})
	}
105 106 107
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
108
	case <-ctx.Done():
109
	}
110 111
}

112 113 114 115 116 117
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

118
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
119 120 121 122
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

123 124
	pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

125
	msg := bsmsg.New(false)
126
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
127
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
128
	err := pm.network.SendMessage(ctx, env.Peer, msg)
129
	if err != nil {
rht's avatar
rht committed
130
		log.Infof("sendblock error: %s", err)
131 132 133
	}
}

134
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
135
	mq, ok := pm.peers[p]
136
	if ok {
Jeromy's avatar
Jeromy committed
137
		mq.refcnt++
Jeromy's avatar
Jeromy committed
138
		return nil
139 140
	}

Jeromy's avatar
Jeromy committed
141
	mq = pm.newMsgQueue(p)
142 143

	// new peer, we will want to give them our full wantlist
144
	fullwantlist := bsmsg.New(true)
145
	for _, e := range pm.wl.Entries() {
146
		fullwantlist.AddEntry(e.Cid, e.Priority)
147 148 149
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
150 151

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
152
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
153
	return mq
154 155
}

156
func (pm *WantManager) stopPeerHandler(p peer.ID) {
157 158 159 160 161 162
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
163 164 165 166 167
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

168 169 170 171
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
172
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
173 174 175 176 177
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
178 179 180
	for {
		select {
		case <-mq.work: // there is work to be done
181
			mq.doWork(ctx)
182 183
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
184 185
		case <-ctx.Done():
			return
186 187 188 189
		}
	}
}

190
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
191
	if mq.sender == nil {
192
		err := mq.openSender(ctx)
Jeromy's avatar
Jeromy committed
193
		if err != nil {
194
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
Jeromy's avatar
Jeromy committed
195 196 197
			// TODO: cant connect, what now?
			return
		}
198 199 200 201 202 203
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
204
		mq.outlk.Unlock()
205 206
		return
	}
Jeromy's avatar
Jeromy committed
207 208
	mq.out = nil
	mq.outlk.Unlock()
209 210

	// send wantlist updates
211
	for { // try to send this message until we fail.
212
		err := mq.sender.SendMsg(ctx, wlm)
213 214 215 216
		if err == nil {
			return
		}

rht's avatar
rht committed
217
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
218 219
		mq.sender.Close()
		mq.sender = nil
220 221 222 223 224 225 226 227 228 229 230 231 232

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
233
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
265
	}
266 267 268

	mq.sender = nsender
	return nil
269 270
}

271
func (pm *WantManager) Connected(p peer.ID) {
272 273 274 275
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
276 277
}

278
func (pm *WantManager) Disconnected(p peer.ID) {
279 280 281 282
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
283 284 285
}

// TODO: use goprocess here once i trust it
286
func (pm *WantManager) Run() {
287
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
288
	defer tock.Stop()
289 290
	for {
		select {
291 292 293
		case entries := <-pm.incoming:

			// add changes to our wantlist
294
			var filtered []*bsmsg.Entry
295 296
			for _, e := range entries {
				if e.Cancel {
297
					if pm.wl.Remove(e.Cid) {
298
						pm.wantlistGauge.Dec()
299 300
						filtered = append(filtered, e)
					}
301
				} else {
302
					if pm.wl.AddEntry(e.Entry) {
303
						pm.wantlistGauge.Inc()
304 305
						filtered = append(filtered, e)
					}
306 307 308
				}
			}

309 310
			// broadcast those wantlist changes
			for _, p := range pm.peers {
311
				p.addMessage(filtered)
312 313
			}

314 315 316 317 318 319
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
320

321 322 323 324 325 326 327
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
328
		case p := <-pm.connect:
329
			pm.startPeerHandler(p)
330 331
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
332 333 334 335 336 337
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
338
		case <-pm.ctx.Done():
339 340 341 342 343
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
344
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
345 346 347 348 349 350 351
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
352 353
}

Jeromy's avatar
Jeromy committed
354
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
355
	mq.outlk.Lock()
356
	defer func() {
Jeromy's avatar
Jeromy committed
357
		mq.outlk.Unlock()
358 359 360 361 362 363
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

364
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
365
	if mq.out == nil {
366
		mq.out = bsmsg.New(false)
367 368 369
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
370 371
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
372
	for _, e := range entries {
373
		if e.Cancel {
374
			mq.out.Cancel(e.Cid)
375
		} else {
376
			mq.out.AddEntry(e.Cid, e.Priority)
377 378 379
		}
	}
}