wantmanager.go 8.73 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12 13

	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
14
	cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid"
15
	peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19
	// sync channels for Run loop
20
	incoming   chan *wantSet
21 22 23
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
24

Jeromy's avatar
Jeromy committed
25
	// synchronized by Run loop, only touch inside there
26
	peers map[peer.ID]*msgQueue
27
	wl    *wantlist.ThreadSafe
28

29
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
30
	ctx     context.Context
Jeromy's avatar
Jeromy committed
31
	cancel  func()
32

33 34
	wantlistGauge metrics.Gauge
	sentHistogram metrics.Histogram
35 36
}

37
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
38
	ctx, cancel := context.WithCancel(ctx)
39
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
40
		"Number of items in wantlist.").Gauge()
41 42
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)
43
	return &WantManager{
44
		incoming:      make(chan *wantSet, 10),
45 46 47 48 49 50 51 52 53 54
		connect:       make(chan peer.ID, 10),
		disconnect:    make(chan peer.ID, 10),
		peerReqs:      make(chan chan []peer.ID),
		peers:         make(map[peer.ID]*msgQueue),
		wl:            wantlist.NewThreadSafe(),
		network:       network,
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
		sentHistogram: sentHistogram,
55 56 57 58 59 60
	}
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
61 62 63
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
64
	wl      *wantlist.Wantlist
65

Jeromy's avatar
Jeromy committed
66 67
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
68 69
	refcnt int

70 71 72 73
	work chan struct{}
	done chan struct{}
}

Jeromy's avatar
Jeromy committed
74
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID) {
Jeromy's avatar
Jeromy committed
75
	log.Infof("want blocks: %s", ks)
Jeromy's avatar
Jeromy committed
76
	pm.addEntries(ctx, ks, peers, false)
77 78
}

Jeromy's avatar
Jeromy committed
79 80
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID) {
	pm.addEntries(context.Background(), ks, peers, true)
81 82 83 84 85
}

type wantSet struct {
	entries []*bsmsg.Entry
	targets []peer.ID
86 87
}

Jeromy's avatar
Jeromy committed
88
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool) {
89 90 91 92
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
93
			Entry: &wantlist.Entry{
94
				Cid:      k,
95
				Priority: kMaxPriority - i,
96
				RefCnt:   1,
97 98 99
			},
		})
	}
100
	select {
Jeromy's avatar
Jeromy committed
101
	case pm.incoming <- &wantSet{entries: entries, targets: targets}:
102
	case <-pm.ctx.Done():
103
	case <-ctx.Done():
104
	}
105 106
}

107 108 109 110 111 112
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

113
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
114 115 116 117
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

118 119
	pm.sentHistogram.Observe(float64(len(env.Block.RawData())))

120
	msg := bsmsg.New(false)
121
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
122
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
123
	err := pm.network.SendMessage(ctx, env.Peer, msg)
124
	if err != nil {
rht's avatar
rht committed
125
		log.Infof("sendblock error: %s", err)
126 127 128
	}
}

129
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
130
	mq, ok := pm.peers[p]
131
	if ok {
Jeromy's avatar
Jeromy committed
132
		mq.refcnt++
Jeromy's avatar
Jeromy committed
133
		return nil
134 135
	}

Jeromy's avatar
Jeromy committed
136
	mq = pm.newMsgQueue(p)
137 138

	// new peer, we will want to give them our full wantlist
139
	fullwantlist := bsmsg.New(true)
140
	for _, e := range pm.wl.Entries() {
141 142
		ne := *e
		mq.wl.AddEntry(&ne)
143
		fullwantlist.AddEntry(e.Cid, e.Priority)
144 145 146
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
147 148

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
149
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
150
	return mq
151 152
}

153
func (pm *WantManager) stopPeerHandler(p peer.ID) {
154 155 156 157 158 159
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
160 161 162 163 164
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

165 166 167 168
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
169
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
170 171 172 173 174
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
175 176 177
	for {
		select {
		case <-mq.work: // there is work to be done
178
			mq.doWork(ctx)
179 180
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
181 182
		case <-ctx.Done():
			return
183 184 185 186
		}
	}
}

187
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
188
	if mq.sender == nil {
189
		err := mq.openSender(ctx)
Jeromy's avatar
Jeromy committed
190
		if err != nil {
191
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
Jeromy's avatar
Jeromy committed
192 193 194
			// TODO: cant connect, what now?
			return
		}
195 196 197 198 199 200
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
201
		mq.outlk.Unlock()
202 203
		return
	}
Jeromy's avatar
Jeromy committed
204 205
	mq.out = nil
	mq.outlk.Unlock()
206 207

	// send wantlist updates
208
	for { // try to send this message until we fail.
209
		err := mq.sender.SendMsg(ctx, wlm)
210 211 212 213
		if err == nil {
			return
		}

rht's avatar
rht committed
214
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
215 216
		mq.sender.Close()
		mq.sender = nil
217 218 219 220 221 222 223 224 225 226 227 228 229

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
230
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
262
	}
263 264 265

	mq.sender = nsender
	return nil
266 267
}

268
func (pm *WantManager) Connected(p peer.ID) {
269 270 271 272
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
273 274
}

275
func (pm *WantManager) Disconnected(p peer.ID) {
276 277 278 279
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
280 281 282
}

// TODO: use goprocess here once i trust it
283
func (pm *WantManager) Run() {
284
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
285
	defer tock.Stop()
286 287
	for {
		select {
288
		case ws := <-pm.incoming:
289 290

			// add changes to our wantlist
291
			for _, e := range ws.entries {
292
				if e.Cancel {
293
					if pm.wl.Remove(e.Cid) {
294
						pm.wantlistGauge.Dec()
295
					}
296
				} else {
297
					if pm.wl.AddEntry(e.Entry) {
298
						pm.wantlistGauge.Inc()
299
					}
300 301 302
				}
			}

303
			// broadcast those wantlist changes
304 305 306 307 308 309 310 311 312 313 314 315 316
			if len(ws.targets) == 0 {
				for _, p := range pm.peers {
					p.addMessage(ws.entries)
				}
			} else {
				for _, t := range ws.targets {
					p, ok := pm.peers[t]
					if !ok {
						log.Warning("tried sending wantlist change to non-partner peer")
						continue
					}
					p.addMessage(ws.entries)
				}
317 318
			}

319 320 321 322 323 324
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
325

326 327 328 329 330 331 332
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
333
		case p := <-pm.connect:
334
			pm.startPeerHandler(p)
335 336
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
337 338 339 340 341 342
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
343
		case <-pm.ctx.Done():
344 345 346 347 348
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
349
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
350 351 352
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
353
		wl:      wantlist.New(),
354 355 356 357
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
358 359
}

Jeromy's avatar
Jeromy committed
360
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
361
	var work bool
Jeromy's avatar
Jeromy committed
362
	mq.outlk.Lock()
363
	defer func() {
Jeromy's avatar
Jeromy committed
364
		mq.outlk.Unlock()
365 366 367
		if !work {
			return
		}
368 369 370 371 372 373
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

374
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
375
	if mq.out == nil {
376
		mq.out = bsmsg.New(false)
377 378 379
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
380 381
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
382
	for _, e := range entries {
383
		if e.Cancel {
384 385 386 387
			if mq.wl.Remove(e.Cid) {
				work = true
				mq.out.Cancel(e.Cid)
			}
388
		} else {
389 390 391 392
			if mq.wl.Add(e.Cid, e.Priority) {
				work = true
				mq.out.AddEntry(e.Cid, e.Priority)
			}
393 394 395
		}
	}
}