wantmanager.go 8.2 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12

13
	cid "gx/ipfs/QmNw61A6sJoXMeP37mJRtQZdNhj5e3FdjoTN3v4FyE96Gk/go-cid"
14
	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
15
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19 20
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
21 22 23
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
24

Jeromy's avatar
Jeromy committed
25
	// synchronized by Run loop, only touch inside there
26
	peers map[peer.ID]*msgQueue
27
	wl    *wantlist.ThreadSafe
28

29
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
30
	ctx     context.Context
Jeromy's avatar
Jeromy committed
31
	cancel  func()
32

33 34
	wantlistGauge metrics.Gauge
	sentHistogram metrics.Histogram
35 36
}

37
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
38
	ctx, cancel := context.WithCancel(ctx)
39
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
40
		"Number of items in wantlist.").Gauge()
41 42
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)
43
	return &WantManager{
44 45 46 47 48 49 50 51 52 53 54
		incoming:      make(chan []*bsmsg.Entry, 10),
		connect:       make(chan peer.ID, 10),
		disconnect:    make(chan peer.ID, 10),
		peerReqs:      make(chan chan []peer.ID),
		peers:         make(map[peer.ID]*msgQueue),
		wl:            wantlist.NewThreadSafe(),
		network:       network,
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
		sentHistogram: sentHistogram,
55 56 57 58 59 60
	}
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
61 62 63
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
64

Jeromy's avatar
Jeromy committed
65 66
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
67 68
	refcnt int

69 70 71 72
	work chan struct{}
	done chan struct{}
}

73
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
Jeromy's avatar
Jeromy committed
74
	log.Infof("want blocks: %s", ks)
75
	pm.addEntries(ctx, ks, false)
76 77
}

78
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
79
	log.Infof("cancel wants: %s", ks)
80
	pm.addEntries(context.TODO(), ks, true)
81 82
}

83
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
84 85 86 87
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
88
			Entry: &wantlist.Entry{
89
				Cid:      k,
90
				Priority: kMaxPriority - i,
91
				RefCnt:   1,
92 93 94
			},
		})
	}
95 96 97
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
98
	case <-ctx.Done():
99
	}
100 101
}

102 103 104 105 106 107
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

108
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
109 110 111 112
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

113 114
	pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

115
	msg := bsmsg.New(false)
116
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
117
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
118
	err := pm.network.SendMessage(ctx, env.Peer, msg)
119
	if err != nil {
rht's avatar
rht committed
120
		log.Infof("sendblock error: %s", err)
121 122 123
	}
}

124
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
125
	mq, ok := pm.peers[p]
126
	if ok {
Jeromy's avatar
Jeromy committed
127
		mq.refcnt++
Jeromy's avatar
Jeromy committed
128
		return nil
129 130
	}

Jeromy's avatar
Jeromy committed
131
	mq = pm.newMsgQueue(p)
132 133

	// new peer, we will want to give them our full wantlist
134
	fullwantlist := bsmsg.New(true)
135
	for _, e := range pm.wl.Entries() {
136
		fullwantlist.AddEntry(e.Cid, e.Priority)
137 138 139
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
140 141

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
142
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
143
	return mq
144 145
}

146
func (pm *WantManager) stopPeerHandler(p peer.ID) {
147 148 149 150 151 152
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
153 154 155 156 157
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

158 159 160 161
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
162
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
163 164 165 166 167
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
168 169 170
	for {
		select {
		case <-mq.work: // there is work to be done
171
			mq.doWork(ctx)
172 173
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
174 175
		case <-ctx.Done():
			return
176 177 178 179
		}
	}
}

180
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
181
	if mq.sender == nil {
182
		err := mq.openSender(ctx)
Jeromy's avatar
Jeromy committed
183
		if err != nil {
184
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
Jeromy's avatar
Jeromy committed
185 186 187
			// TODO: cant connect, what now?
			return
		}
188 189 190 191 192 193
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
194
		mq.outlk.Unlock()
195 196
		return
	}
Jeromy's avatar
Jeromy committed
197 198
	mq.out = nil
	mq.outlk.Unlock()
199 200

	// send wantlist updates
201
	for { // try to send this message until we fail.
202
		err := mq.sender.SendMsg(ctx, wlm)
203 204 205 206
		if err == nil {
			return
		}

rht's avatar
rht committed
207
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
208 209
		mq.sender.Close()
		mq.sender = nil
210 211 212 213 214 215 216 217 218 219 220 221 222

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
223
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
255
	}
256 257 258

	mq.sender = nsender
	return nil
259 260
}

261
func (pm *WantManager) Connected(p peer.ID) {
262 263 264 265
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
266 267
}

268
func (pm *WantManager) Disconnected(p peer.ID) {
269 270 271 272
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
273 274 275
}

// TODO: use goprocess here once i trust it
276
func (pm *WantManager) Run() {
277
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
278
	defer tock.Stop()
279 280
	for {
		select {
281 282 283
		case entries := <-pm.incoming:

			// add changes to our wantlist
284
			var filtered []*bsmsg.Entry
285 286
			for _, e := range entries {
				if e.Cancel {
287
					if pm.wl.Remove(e.Cid) {
288
						pm.wantlistGauge.Dec()
289 290
						filtered = append(filtered, e)
					}
291
				} else {
292
					if pm.wl.AddEntry(e.Entry) {
293
						pm.wantlistGauge.Inc()
294 295
						filtered = append(filtered, e)
					}
296 297 298
				}
			}

299 300
			// broadcast those wantlist changes
			for _, p := range pm.peers {
301
				p.addMessage(filtered)
302 303
			}

304 305 306 307 308 309
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
310

311 312 313 314 315 316 317
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
318
		case p := <-pm.connect:
319
			pm.startPeerHandler(p)
320 321
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
322 323 324 325 326 327
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
328
		case <-pm.ctx.Done():
329 330 331 332 333
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
334
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
335 336 337 338 339 340 341
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
342 343
}

Jeromy's avatar
Jeromy committed
344
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
345
	mq.outlk.Lock()
346
	defer func() {
Jeromy's avatar
Jeromy committed
347
		mq.outlk.Unlock()
348 349 350 351 352 353
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

354
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
355
	if mq.out == nil {
356
		mq.out = bsmsg.New(false)
357 358 359
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
360 361
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
362
	for _, e := range entries {
363
		if e.Cancel {
364
			mq.out.Cancel(e.Cid)
365
		} else {
366
			mq.out.AddEntry(e.Cid, e.Priority)
367 368 369
		}
	}
}