wantmanager.go 8.07 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12 13

	metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
14 15
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
	peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
16 17
)

18
type WantManager struct {
Jeromy's avatar
Jeromy committed
19 20
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
21 22 23
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
24

Jeromy's avatar
Jeromy committed
25
	// synchronized by Run loop, only touch inside there
26
	peers map[peer.ID]*msgQueue
27
	wl    *wantlist.ThreadSafe
28

29
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
30
	ctx     context.Context
Jeromy's avatar
Jeromy committed
31
	cancel  func()
32 33

	metricWantlist metrics.Gauge
34 35
}

36
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
37
	ctx, cancel := context.WithCancel(ctx)
38 39
	wantlistGauge := metrics.NewCtx(ctx, "wanlist_total",
		"Number of items in wantlist.").Gauge()
40
	return &WantManager{
41 42 43 44 45 46 47 48 49 50
		incoming:       make(chan []*bsmsg.Entry, 10),
		connect:        make(chan peer.ID, 10),
		disconnect:     make(chan peer.ID, 10),
		peerReqs:       make(chan chan []peer.ID),
		peers:          make(map[peer.ID]*msgQueue),
		wl:             wantlist.NewThreadSafe(),
		network:        network,
		ctx:            ctx,
		cancel:         cancel,
		metricWantlist: wantlistGauge,
51 52 53 54 55 56 57 58 59 60
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
61
	blk *cid.Cid
62 63 64 65 66
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
67 68 69
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
70

Jeromy's avatar
Jeromy committed
71 72
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
73 74
	refcnt int

75 76 77 78
	work chan struct{}
	done chan struct{}
}

79
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
Jeromy's avatar
Jeromy committed
80
	log.Infof("want blocks: %s", ks)
81
	pm.addEntries(ctx, ks, false)
82 83
}

84
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
85
	log.Infof("cancel wants: %s", ks)
86
	pm.addEntries(context.TODO(), ks, true)
87 88
}

89
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
90 91 92 93
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
94
			Entry: &wantlist.Entry{
95
				Cid:      k,
96
				Priority: kMaxPriority - i,
97
				RefCnt:   1,
98 99 100
			},
		})
	}
101 102 103
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
104
	case <-ctx.Done():
105
	}
106 107
}

108 109 110 111 112 113
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

114
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
115 116 117 118
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

119
	msg := bsmsg.New(false)
120
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
121
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
122
	err := pm.network.SendMessage(ctx, env.Peer, msg)
123
	if err != nil {
rht's avatar
rht committed
124
		log.Infof("sendblock error: %s", err)
125 126 127
	}
}

128
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
129
	mq, ok := pm.peers[p]
130
	if ok {
Jeromy's avatar
Jeromy committed
131
		mq.refcnt++
Jeromy's avatar
Jeromy committed
132
		return nil
133 134
	}

Jeromy's avatar
Jeromy committed
135
	mq = pm.newMsgQueue(p)
136 137

	// new peer, we will want to give them our full wantlist
138
	fullwantlist := bsmsg.New(true)
139
	for _, e := range pm.wl.Entries() {
140
		fullwantlist.AddEntry(e.Cid, e.Priority)
141 142 143
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
144 145

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
146
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
147
	return mq
148 149
}

150
func (pm *WantManager) stopPeerHandler(p peer.ID) {
151 152 153 154 155 156
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
157 158 159 160 161
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

162 163 164 165
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
166
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
167 168 169 170 171
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
172 173 174
	for {
		select {
		case <-mq.work: // there is work to be done
175
			mq.doWork(ctx)
176 177
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
178 179
		case <-ctx.Done():
			return
180 181 182 183
		}
	}
}

184
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
185
	if mq.sender == nil {
186
		err := mq.openSender(ctx)
Jeromy's avatar
Jeromy committed
187
		if err != nil {
188
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
Jeromy's avatar
Jeromy committed
189 190 191
			// TODO: cant connect, what now?
			return
		}
192 193 194 195 196 197
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
198
		mq.outlk.Unlock()
199 200
		return
	}
Jeromy's avatar
Jeromy committed
201 202
	mq.out = nil
	mq.outlk.Unlock()
203 204

	// send wantlist updates
205
	for { // try to send this message until we fail.
206
		err := mq.sender.SendMsg(ctx, wlm)
207 208 209 210
		if err == nil {
			return
		}

rht's avatar
rht committed
211
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
212 213
		mq.sender.Close()
		mq.sender = nil
214 215 216 217 218 219 220 221 222 223 224 225 226

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
Jeromy's avatar
Jeromy committed
227
			log.Errorf("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
259
	}
260 261 262

	mq.sender = nsender
	return nil
263 264
}

265
func (pm *WantManager) Connected(p peer.ID) {
266 267 268 269
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
270 271
}

272
func (pm *WantManager) Disconnected(p peer.ID) {
273 274 275 276
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
277 278 279
}

// TODO: use goprocess here once i trust it
280
func (pm *WantManager) Run() {
281
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
282
	defer tock.Stop()
283 284
	for {
		select {
285 286 287
		case entries := <-pm.incoming:

			// add changes to our wantlist
288
			var filtered []*bsmsg.Entry
289 290
			for _, e := range entries {
				if e.Cancel {
291
					if pm.wl.Remove(e.Cid) {
292
						pm.metricWantlist.Dec()
293 294
						filtered = append(filtered, e)
					}
295
				} else {
296
					if pm.wl.AddEntry(e.Entry) {
297
						pm.metricWantlist.Inc()
298 299
						filtered = append(filtered, e)
					}
300 301 302
				}
			}

303 304
			// broadcast those wantlist changes
			for _, p := range pm.peers {
305
				p.addMessage(filtered)
306 307
			}

308 309 310 311 312 313
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
314

315 316 317 318 319 320 321
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
322
		case p := <-pm.connect:
323
			pm.startPeerHandler(p)
324 325
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
326 327 328 329 330 331
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
332
		case <-pm.ctx.Done():
333 334 335 336 337
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
338
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
339 340 341 342 343 344 345
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
346 347
}

Jeromy's avatar
Jeromy committed
348
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
349
	mq.outlk.Lock()
350
	defer func() {
Jeromy's avatar
Jeromy committed
351
		mq.outlk.Unlock()
352 353 354 355 356 357
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

358
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
359
	if mq.out == nil {
360
		mq.out = bsmsg.New(false)
361 362 363
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
364 365
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
366
	for _, e := range entries {
367
		if e.Cancel {
368
			mq.out.Cancel(e.Cid)
369
		} else {
370
			mq.out.AddEntry(e.Cid, e.Priority)
371 372 373
		}
	}
}