wantmanager.go 6.98 KB
Newer Older
1 2 3 4
package bitswap

import (
	"sync"
5
	"time"
6 7 8 9

	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
10
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
11
	peer "gx/ipfs/QmWtbQU15LaB5B1JC2F7TV9P4K88vD3PpA4AJrwfCjhML8/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
12
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
George Antoniadis's avatar
George Antoniadis committed
13
	key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
14 15
)

16
type WantManager struct {
Jeromy's avatar
Jeromy committed
17 18
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
19 20 21
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
22

Jeromy's avatar
Jeromy committed
23
	// synchronized by Run loop, only touch inside there
24
	peers map[peer.ID]*msgQueue
25
	wl    *wantlist.ThreadSafe
26

27
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
28
	ctx     context.Context
Jeromy's avatar
Jeromy committed
29
	cancel  func()
30 31
}

32
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
Jeromy's avatar
Jeromy committed
33
	ctx, cancel := context.WithCancel(ctx)
34 35
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
36 37
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
38
		peerReqs:   make(chan chan []peer.ID),
39
		peers:      make(map[peer.ID]*msgQueue),
40
		wl:         wantlist.NewThreadSafe(),
41
		network:    network,
42
		ctx:        ctx,
Jeromy's avatar
Jeromy committed
43
		cancel:     cancel,
44 45 46 47 48 49 50 51 52 53
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
54
	blk key.Key
55 56 57 58 59
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
60 61 62
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
63

Jeromy's avatar
Jeromy committed
64 65
	sender bsnet.MessageSender

Jeromy's avatar
Jeromy committed
66 67
	refcnt int

68 69 70 71
	work chan struct{}
	done chan struct{}
}

72
func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
Jeromy's avatar
Jeromy committed
73
	log.Infof("want blocks: %s", ks)
74
	pm.addEntries(ctx, ks, false)
75 76
}

77
func (pm *WantManager) CancelWants(ks []key.Key) {
78
	pm.addEntries(context.TODO(), ks, true)
79 80
}

81
func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
82 83 84 85 86 87 88
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
			Entry: wantlist.Entry{
				Key:      k,
				Priority: kMaxPriority - i,
89
				Ctx:      ctx,
90 91 92
			},
		})
	}
93 94 95 96
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
	}
97 98
}

99 100 101 102 103 104
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

105
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
106 107 108 109
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

110
	msg := bsmsg.New(false)
111
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
112
	log.Infof("Sending block %s to %s", env.Block, env.Peer)
Jeromy's avatar
Jeromy committed
113
	err := pm.network.SendMessage(ctx, env.Peer, msg)
114
	if err != nil {
rht's avatar
rht committed
115
		log.Infof("sendblock error: %s", err)
116 117 118
	}
}

119
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
120
	mq, ok := pm.peers[p]
121
	if ok {
Jeromy's avatar
Jeromy committed
122
		mq.refcnt++
Jeromy's avatar
Jeromy committed
123
		return nil
124 125
	}

Jeromy's avatar
Jeromy committed
126
	mq = pm.newMsgQueue(p)
127 128

	// new peer, we will want to give them our full wantlist
129
	fullwantlist := bsmsg.New(true)
130 131 132 133 134
	for _, e := range pm.wl.Entries() {
		fullwantlist.AddEntry(e.Key, e.Priority)
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
135 136

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
137
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
138
	return mq
139 140
}

141
func (pm *WantManager) stopPeerHandler(p peer.ID) {
142 143 144 145 146 147
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
148 149 150 151 152
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

153 154 155 156
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
157
func (mq *msgQueue) runQueue(ctx context.Context) {
Jeromy's avatar
Jeromy committed
158 159 160 161 162
	defer func() {
		if mq.sender != nil {
			mq.sender.Close()
		}
	}()
163 164 165
	for {
		select {
		case <-mq.work: // there is work to be done
166
			mq.doWork(ctx)
167 168
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
169 170
		case <-ctx.Done():
			return
171 172 173 174
		}
	}
}

175
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
176
	// allow ten minutes for connections
177 178
	// this includes looking them up in the dht
	// dialing them, and handshaking
Jeromy's avatar
Jeromy committed
179 180 181 182 183 184 185 186 187 188
	if mq.sender == nil {
		conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
		defer cancel()

		err := mq.network.ConnectTo(conctx, mq.p)
		if err != nil {
			log.Infof("cant connect to peer %s: %s", mq.p, err)
			// TODO: cant connect, what now?
			return
		}
189

Jeromy's avatar
Jeromy committed
190 191 192 193 194 195 196 197
		nsender, err := mq.network.NewMessageSender(ctx, mq.p)
		if err != nil {
			log.Infof("cant open new stream to peer %s: %s", mq.p, err)
			// TODO: cant open stream, what now?
			return
		}

		mq.sender = nsender
198 199 200 201 202 203
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
204
		mq.outlk.Unlock()
205 206
		return
	}
Jeromy's avatar
Jeromy committed
207 208
	mq.out = nil
	mq.outlk.Unlock()
209 210

	// send wantlist updates
Jeromy's avatar
Jeromy committed
211
	err := mq.sender.SendMsg(wlm)
212
	if err != nil {
rht's avatar
rht committed
213
		log.Infof("bitswap send error: %s", err)
Jeromy's avatar
Jeromy committed
214 215
		mq.sender.Close()
		mq.sender = nil
216 217 218 219 220
		// TODO: what do we do if this fails?
		return
	}
}

221
func (pm *WantManager) Connected(p peer.ID) {
222 223 224 225
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
226 227
}

228
func (pm *WantManager) Disconnected(p peer.ID) {
229 230 231 232
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
233 234 235
}

// TODO: use goprocess here once i trust it
236
func (pm *WantManager) Run() {
237
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
238
	defer tock.Stop()
239 240
	for {
		select {
241 242 243 244 245 246 247
		case entries := <-pm.incoming:

			// add changes to our wantlist
			for _, e := range entries {
				if e.Cancel {
					pm.wl.Remove(e.Key)
				} else {
248
					pm.wl.AddEntry(e.Entry)
249 250 251
				}
			}

252 253
			// broadcast those wantlist changes
			for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
254
				p.addMessage(entries)
255 256
			}

257 258 259 260
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
261 262 263 264 265 266 267 268
				select {
				case <-e.Ctx.Done():
					// entry has been cancelled
					// simply continue, the entry will be removed from the
					// wantlist soon enough
					continue
				default:
				}
269 270 271 272 273 274 275 276 277
				es = append(es, &bsmsg.Entry{Entry: e})
			}
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
278
		case p := <-pm.connect:
279
			pm.startPeerHandler(p)
280 281
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
282 283 284 285 286 287
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
288
		case <-pm.ctx.Done():
289 290 291 292 293
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
294
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
295 296 297
	mq := new(msgQueue)
	mq.done = make(chan struct{})
	mq.work = make(chan struct{}, 1)
Jeromy's avatar
Jeromy committed
298
	mq.network = wm.network
299
	mq.p = p
Jeromy's avatar
Jeromy committed
300
	mq.refcnt = 1
301 302 303 304

	return mq
}

Jeromy's avatar
Jeromy committed
305
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
306
	mq.outlk.Lock()
307
	defer func() {
Jeromy's avatar
Jeromy committed
308
		mq.outlk.Unlock()
309 310 311 312 313 314
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

Jeromy's avatar
Jeromy committed
315 316
	// if we have no message held, or the one we are given is full
	// overwrite the one we are holding
Jeromy's avatar
Jeromy committed
317
	if mq.out == nil {
318
		mq.out = bsmsg.New(false)
319 320 321
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
322 323
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
324
	for _, e := range entries {
325
		if e.Cancel {
Jeromy's avatar
Jeromy committed
326
			mq.out.Cancel(e.Key)
327
		} else {
Jeromy's avatar
Jeromy committed
328
			mq.out.AddEntry(e.Key, e.Priority)
329 330 331
		}
	}
}