wantmanager.go 6.91 KB
Newer Older
1 2 3
package bitswap

import (
4
	"context"
5
	"sync"
6
	"time"
7 8 9 10

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12
	cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
13
	peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
14 15
)

16
type WantManager struct {
Jeromy's avatar
Jeromy committed
17 18
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
19 20 21
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
22

Jeromy's avatar
Jeromy committed
23
	// synchronized by Run loop, only touch inside there
24
	peers map[peer.ID]*msgQueue
25
	wl    *wantlist.ThreadSafe
26

27
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
28
	ctx     context.Context
Jeromy's avatar
Jeromy committed
29
	cancel  func()
30 31
}

32
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
33
	ctx, cancel := context.WithCancel(ctx)
34 35
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
36 37
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
38
		peerReqs:   make(chan chan []peer.ID),
39
		peers:      make(map[peer.ID]*msgQueue),
40
		wl:         wantlist.NewThreadSafe(),
41
		network:    network,
42
		ctx:        ctx,
Jeromy's avatar
Jeromy committed
43
		cancel:     cancel,
44 45 46 47 48 49 50 51 52 53
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
54
	blk *cid.Cid
55 56 57 58 59
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
60 61 62
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
63

Jeromy's avatar
Jeromy committed
64 65
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
66 67
	refcnt int

68 69 70 71
	work chan struct{}
	done chan struct{}
}

72
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) {
Jeromy's avatar
Jeromy committed
73
	log.Infof("want blocks: %s", ks)
74
	pm.addEntries(ctx, ks, false)
75 76
}

77
func (pm *WantManager) CancelWants(ks []*cid.Cid) {
78
	log.Infof("cancel wants: %s", ks)
79
	pm.addEntries(context.TODO(), ks, true)
80 81
}

82
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) {
83 84 85 86
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
87
			Entry: &wantlist.Entry{
88
				Cid:      k,
89
				Priority: kMaxPriority - i,
90
				RefCnt:   1,
91 92 93
			},
		})
	}
94 95 96
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
97
	case <-ctx.Done():
98
	}
99 100
}

101 102 103 104 105 106
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

107
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
108 109 110 111
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

112
	msg := bsmsg.New(false)
113
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
114
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
115
	err := pm.network.SendMessage(ctx, env.Peer, msg)
116
	if err != nil {
rht's avatar
rht committed
117
		log.Infof("sendblock error: %s", err)
118 119 120
	}
}

121
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
122
	mq, ok := pm.peers[p]
123
	if ok {
Jeromy's avatar
Jeromy committed
124
		mq.refcnt++
Jeromy's avatar
Jeromy committed
125
		return nil
126 127
	}

Jeromy's avatar
Jeromy committed
128
	mq = pm.newMsgQueue(p)
129 130

	// new peer, we will want to give them our full wantlist
131
	fullwantlist := bsmsg.New(true)
132
	for _, e := range pm.wl.Entries() {
133
		fullwantlist.AddEntry(e.Cid, e.Priority)
134 135 136
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
137 138

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
139
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
140
	return mq
141 142
}

143
func (pm *WantManager) stopPeerHandler(p peer.ID) {
144 145 146 147 148 149
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
150 151 152 153 154
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

155 156 157 158
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
159
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
160 161 162 163 164
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
165 166 167
	for {
		select {
		case <-mq.work: // there is work to be done
168
			mq.doWork(ctx)
169 170
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
171 172
		case <-ctx.Done():
			return
173 174 175 176
		}
	}
}

177
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
178
	// allow ten minutes for connections
179 180
	// this includes looking them up in the dht
	// dialing them, and handshaking
Jeromy's avatar
Jeromy committed
181 182 183 184 185 186 187 188 189 190
	if mq.sender == nil {
		conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
		defer cancel()

		err := mq.network.ConnectTo(conctx, mq.p)
		if err != nil {
			log.Infof("cant connect to peer %s: %s", mq.p, err)
			// TODO: cant connect, what now?
			return
		}
191

Jeromy's avatar
Jeromy committed
192 193 194 195 196 197 198 199
		nsender, err := mq.network.NewMessageSender(ctx, mq.p)
		if err != nil {
			log.Infof("cant open new stream to peer %s: %s", mq.p, err)
			// TODO: cant open stream, what now?
			return
		}

		mq.sender = nsender
200 201 202 203 204 205
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
206
		mq.outlk.Unlock()
207 208
		return
	}
Jeromy's avatar
Jeromy committed
209 210
	mq.out = nil
	mq.outlk.Unlock()
211 212

	// send wantlist updates
Jeromy's avatar
Jeromy committed
213
	err := mq.sender.SendMsg(wlm)
214
	if err != nil {
rht's avatar
rht committed
215
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
216 217
		mq.sender.Close()
		mq.sender = nil
218 219 220 221 222
		// TODO: what do we do if this fails?
		return
	}
}

223
func (pm *WantManager) Connected(p peer.ID) {
224 225 226 227
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
228 229
}

230
func (pm *WantManager) Disconnected(p peer.ID) {
231 232 233 234
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
235 236 237
}

// TODO: use goprocess here once i trust it
238
func (pm *WantManager) Run() {
239
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
240
	defer tock.Stop()
241 242
	for {
		select {
243 244 245
		case entries := <-pm.incoming:

			// add changes to our wantlist
246
			var filtered []*bsmsg.Entry
247 248
			for _, e := range entries {
				if e.Cancel {
249
					if pm.wl.Remove(e.Cid) {
250 251
						filtered = append(filtered, e)
					}
252
				} else {
253 254 255
					if pm.wl.AddEntry(e.Entry) {
						filtered = append(filtered, e)
					}
256 257 258
				}
			}

259 260
			// broadcast those wantlist changes
			for _, p := range pm.peers {
261
				p.addMessage(filtered)
262 263
			}

264 265 266 267 268 269
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
270

271 272 273 274 275 276 277
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
278
		case p := <-pm.connect:
279
			pm.startPeerHandler(p)
280 281
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
282 283 284 285 286 287
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
288
		case <-pm.ctx.Done():
289 290 291 292 293
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
294
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
295 296 297
	mq := new(msgQueue)
	mq.done = make(chan struct{})
	mq.work = make(chan struct{}, 1)
Jeromy's avatar
Jeromy committed
298
	mq.network = wm.network
299
	mq.p = p
Jeromy's avatar
Jeromy committed
300
	mq.refcnt = 1
301 302 303 304

	return mq
}

Jeromy's avatar
Jeromy committed
305
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
306
	mq.outlk.Lock()
307
	defer func() {
Jeromy's avatar
Jeromy committed
308
		mq.outlk.Unlock()
309 310 311 312 313 314
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

Jeromy's avatar
Jeromy committed
315 316
	// if we have no message held, or the one we are given is full
	// overwrite the one we are holding
Jeromy's avatar
Jeromy committed
317
	if mq.out == nil {
318
		mq.out = bsmsg.New(false)
319 320 321
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
322 323
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
324
	for _, e := range entries {
325
		if e.Cancel {
326
			mq.out.Cancel(e.Cid)
327
		} else {
328
			mq.out.AddEntry(e.Cid, e.Priority)
329 330 331
		}
	}
}