wantmanager.go 7.73 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
12
	cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
13
	peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
14 15
)

16
type WantManager struct {
Jeromy's avatar
Jeromy committed
17 18
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
19 20 21
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
22

Jeromy's avatar
Jeromy committed
23
	// synchronized by Run loop, only touch inside there
24
	peers map[peer.ID]*msgQueue
25
	wl    *wantlist.ThreadSafe
26

27
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
28
	ctx     context.Context
Jeromy's avatar
Jeromy committed
29
	cancel  func()
30 31
}

32
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
33
	ctx, cancel := context.WithCancel(ctx)
34 35
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
36 37
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
38
		peerReqs:   make(chan chan []peer.ID),
39
		peers:      make(map[peer.ID]*msgQueue),
40
		wl:         wantlist.NewThreadSafe(),
41
		network:    network,
42
		ctx:        ctx,
Jeromy's avatar
Jeromy committed
43
		cancel:     cancel,
44 45 46 47 48 49 50 51 52 53
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
54
	blk *cid.Cid
55 56 57 58 59
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
60 61 62
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
63

Jeromy's avatar
Jeromy committed
64 65
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
66 67
	refcnt int

68 69 70 71
	work chan struct{}
	done chan struct{}
}

72
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
Jeromy's avatar
Jeromy committed
73
	log.Infof("want blocks: %s", ks)
74
	pm.addEntries(ctx, ks, false)
75 76
}

77
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
78
	log.Infof("cancel wants: %s", ks)
79
	pm.addEntries(context.TODO(), ks, true)
80 81
}

82
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
83 84 85 86
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
87
			Entry: &wantlist.Entry{
88
				Cid:      k,
89
				Priority: kMaxPriority - i,
90
				RefCnt:   1,
91 92 93
			},
		})
	}
94 95 96
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
97
	case <-ctx.Done():
98
	}
99 100
}

101 102 103 104 105 106
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

107
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
108 109 110 111
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

112
	msg := bsmsg.New(false)
113
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
114
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
115
	err := pm.network.SendMessage(ctx, env.Peer, msg)
116
	if err != nil {
rht's avatar
rht committed
117
		log.Infof("sendblock error: %s", err)
118 119 120
	}
}

121
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
122
	mq, ok := pm.peers[p]
123
	if ok {
Jeromy's avatar
Jeromy committed
124
		mq.refcnt++
Jeromy's avatar
Jeromy committed
125
		return nil
126 127
	}

Jeromy's avatar
Jeromy committed
128
	mq = pm.newMsgQueue(p)
129 130

	// new peer, we will want to give them our full wantlist
131
	fullwantlist := bsmsg.New(true)
132
	for _, e := range pm.wl.Entries() {
133
		fullwantlist.AddEntry(e.Cid, e.Priority)
134 135 136
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
137 138

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
139
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
140
	return mq
141 142
}

143
func (pm *WantManager) stopPeerHandler(p peer.ID) {
144 145 146 147 148 149
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
150 151 152 153 154
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

155 156 157 158
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
159
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
160 161 162 163 164
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
165 166 167
	for {
		select {
		case <-mq.work: // there is work to be done
168
			mq.doWork(ctx)
169 170
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
171 172
		case <-ctx.Done():
			return
173 174 175 176
		}
	}
}

177
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
178
	if mq.sender == nil {
179
		err := mq.openSender(ctx)
Jeromy's avatar
Jeromy committed
180
		if err != nil {
181
			log.Infof("cant open message sender to peer %s: %s", mq.p, err)
Jeromy's avatar
Jeromy committed
182 183 184
			// TODO: cant connect, what now?
			return
		}
185 186 187 188 189 190
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
191
		mq.outlk.Unlock()
192 193
		return
	}
Jeromy's avatar
Jeromy committed
194 195
	mq.out = nil
	mq.outlk.Unlock()
196 197

	// send wantlist updates
198
	for { // try to send this message until we fail.
199
		err := mq.sender.SendMsg(ctx, wlm)
200 201 202 203
		if err == nil {
			return
		}

rht's avatar
rht committed
204
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
205 206
		mq.sender.Close()
		mq.sender = nil
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251

		select {
		case <-mq.done:
			return
		case <-ctx.Done():
			return
		case <-time.After(time.Millisecond * 100):
			// wait 100ms in case disconnect notifications are still propogating
			log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
		}

		err = mq.openSender(ctx)
		if err != nil {
			log.Error("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
			// TODO(why): what do we do now?
			// I think the *right* answer is to probably put the message we're
			// trying to send back, and then return to waiting for new work or
			// a disconnect.
			return
		}

		// TODO: Is this the same instance for the remote peer?
		// If its not, we should resend our entire wantlist to them
		/*
			if mq.sender.InstanceID() != mq.lastSeenInstanceID {
				wlm = mq.getFullWantlistMessage()
			}
		*/
	}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		return err
	}

	nsender, err := mq.network.NewMessageSender(ctx, mq.p)
	if err != nil {
		return err
252
	}
253 254 255

	mq.sender = nsender
	return nil
256 257
}

258
func (pm *WantManager) Connected(p peer.ID) {
259 260 261 262
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
263 264
}

265
func (pm *WantManager) Disconnected(p peer.ID) {
266 267 268 269
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
270 271 272
}

// TODO: use goprocess here once i trust it
273
func (pm *WantManager) Run() {
274
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
275
	defer tock.Stop()
276 277
	for {
		select {
278 279 280
		case entries := <-pm.incoming:

			// add changes to our wantlist
281
			var filtered []*bsmsg.Entry
282 283
			for _, e := range entries {
				if e.Cancel {
284
					if pm.wl.Remove(e.Cid) {
285 286
						filtered = append(filtered, e)
					}
287
				} else {
288 289 290
					if pm.wl.AddEntry(e.Entry) {
						filtered = append(filtered, e)
					}
291 292 293
				}
			}

294 295
			// broadcast those wantlist changes
			for _, p := range pm.peers {
296
				p.addMessage(filtered)
297 298
			}

299 300 301 302 303 304
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
305

306 307 308 309 310 311 312
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
313
		case p := <-pm.connect:
314
			pm.startPeerHandler(p)
315 316
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
317 318 319 320 321 322
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
323
		case <-pm.ctx.Done():
324 325 326 327 328
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
329
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
330 331 332 333 334 335 336
	return &msgQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
		network: wm.network,
		p:       p,
		refcnt:  1,
	}
337 338
}

Jeromy's avatar
Jeromy committed
339
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
340
	mq.outlk.Lock()
341
	defer func() {
Jeromy's avatar
Jeromy committed
342
		mq.outlk.Unlock()
343 344 345 346 347 348
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

349
	// if we have no message held allocate a new one
Jeromy's avatar
Jeromy committed
350
	if mq.out == nil {
351
		mq.out = bsmsg.New(false)
352 353 354
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
355 356
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
357
	for _, e := range entries {
358
		if e.Cancel {
359
			mq.out.Cancel(e.Cid)
360
		} else {
361
			mq.out.AddEntry(e.Cid, e.Priority)
362 363 364
		}
	}
}