wantmanager.go 6.6 KB
Newer Older
1 2 3 4
package bitswap

import (
	"sync"
5
	"time"
6

7
	key "github.com/ipfs/go-ipfs/blocks/key"
8 9 10
	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
11
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
12
	peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
13
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
14 15
)

16
type WantManager struct {
Jeromy's avatar
Jeromy committed
17 18
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
19 20 21
	connect    chan peer.ID        // notification channel for new peers connecting
	disconnect chan peer.ID        // notification channel for peers disconnecting
	peerReqs   chan chan []peer.ID // channel to request connected peers on
22

Jeromy's avatar
Jeromy committed
23
	// synchronized by Run loop, only touch inside there
24
	peers map[peer.ID]*msgQueue
25
	wl    *wantlist.ThreadSafe
26

27
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
28
	ctx     context.Context
29 30
}

31
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
32 33
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
34 35
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
36
		peerReqs:   make(chan chan []peer.ID),
37
		peers:      make(map[peer.ID]*msgQueue),
38
		wl:         wantlist.NewThreadSafe(),
39
		network:    network,
40
		ctx:        ctx,
41 42 43 44 45 46 47 48 49 50
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
51
	blk key.Key
52 53 54 55 56
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
57 58 59
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
60

Jeromy's avatar
Jeromy committed
61 62
	refcnt int

63 64 65 66
	work chan struct{}
	done chan struct{}
}

67
func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
Jeromy's avatar
Jeromy committed
68
	log.Infof("want blocks: %s", ks)
69
	pm.addEntries(ctx, ks, false)
70 71
}

72
func (pm *WantManager) CancelWants(ks []key.Key) {
73
	pm.addEntries(context.TODO(), ks, true)
74 75
}

76
func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
77 78 79 80 81 82 83
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
			Entry: wantlist.Entry{
				Key:      k,
				Priority: kMaxPriority - i,
84
				Ctx:      ctx,
85 86 87
			},
		})
	}
88 89 90 91
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
	}
92 93
}

94 95 96 97 98 99
func (pm *WantManager) ConnectedPeers() []peer.ID {
	resp := make(chan []peer.ID)
	pm.peerReqs <- resp
	return <-resp
}

100
func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
101 102 103 104
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

105
	msg := bsmsg.New(false)
106
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
107
	log.Infof("Sending block %s to %s", env.Peer, env.Block)
Jeromy's avatar
Jeromy committed
108
	err := pm.network.SendMessage(ctx, env.Peer, msg)
109
	if err != nil {
rht's avatar
rht committed
110
		log.Infof("sendblock error: %s", err)
111 112 113
	}
}

114
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
Jeromy's avatar
Jeromy committed
115
	mq, ok := pm.peers[p]
116
	if ok {
Jeromy's avatar
Jeromy committed
117
		mq.refcnt++
Jeromy's avatar
Jeromy committed
118
		return nil
119 120
	}

Jeromy's avatar
Jeromy committed
121
	mq = pm.newMsgQueue(p)
122 123

	// new peer, we will want to give them our full wantlist
124
	fullwantlist := bsmsg.New(true)
125 126 127 128 129
	for _, e := range pm.wl.Entries() {
		fullwantlist.AddEntry(e.Key, e.Priority)
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
130 131

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
132
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
133
	return mq
134 135
}

136
func (pm *WantManager) stopPeerHandler(p peer.ID) {
137 138 139 140 141 142
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

Jeromy's avatar
Jeromy committed
143 144 145 146 147
	pq.refcnt--
	if pq.refcnt > 0 {
		return
	}

148 149 150 151
	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
152
func (mq *msgQueue) runQueue(ctx context.Context) {
153 154 155
	for {
		select {
		case <-mq.work: // there is work to be done
156
			mq.doWork(ctx)
157 158
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
159 160
		case <-ctx.Done():
			return
161 162 163 164
		}
	}
}

165
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
166
	// allow ten minutes for connections
167 168
	// this includes looking them up in the dht
	// dialing them, and handshaking
Jeromy's avatar
Jeromy committed
169
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
170 171 172 173
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
rht's avatar
rht committed
174
		log.Infof("cant connect to peer %s: %s", mq.p, err)
175 176 177 178 179 180 181 182
		// TODO: cant connect, what now?
		return
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
183
		mq.outlk.Unlock()
184 185
		return
	}
Jeromy's avatar
Jeromy committed
186 187
	mq.out = nil
	mq.outlk.Unlock()
188

Jeromy's avatar
Jeromy committed
189
	sendctx, cancel := context.WithTimeout(ctx, time.Minute*5)
190 191 192 193 194
	defer cancel()

	// send wantlist updates
	err = mq.network.SendMessage(sendctx, mq.p, wlm)
	if err != nil {
rht's avatar
rht committed
195
		log.Infof("bitswap send error: %s", err)
196 197 198 199 200
		// TODO: what do we do if this fails?
		return
	}
}

201
func (pm *WantManager) Connected(p peer.ID) {
202 203 204 205
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
206 207
}

208
func (pm *WantManager) Disconnected(p peer.ID) {
209 210 211 212
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
213 214 215
}

// TODO: use goprocess here once i trust it
216
func (pm *WantManager) Run() {
217
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
218
	defer tock.Stop()
219 220
	for {
		select {
221 222 223 224 225 226 227
		case entries := <-pm.incoming:

			// add changes to our wantlist
			for _, e := range entries {
				if e.Cancel {
					pm.wl.Remove(e.Key)
				} else {
228
					pm.wl.AddEntry(e.Entry)
229 230 231
				}
			}

232 233
			// broadcast those wantlist changes
			for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
234
				p.addMessage(entries)
235 236
			}

237 238 239 240
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
241 242 243 244 245 246 247 248
				select {
				case <-e.Ctx.Done():
					// entry has been cancelled
					// simply continue, the entry will be removed from the
					// wantlist soon enough
					continue
				default:
				}
249 250 251 252 253 254 255 256 257
				es = append(es, &bsmsg.Entry{Entry: e})
			}
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
258
		case p := <-pm.connect:
259
			pm.startPeerHandler(p)
260 261
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
262 263 264 265 266 267
		case req := <-pm.peerReqs:
			var peers []peer.ID
			for p := range pm.peers {
				peers = append(peers, p)
			}
			req <- peers
268
		case <-pm.ctx.Done():
269 270 271 272 273
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
274
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
275 276 277
	mq := new(msgQueue)
	mq.done = make(chan struct{})
	mq.work = make(chan struct{}, 1)
Jeromy's avatar
Jeromy committed
278
	mq.network = wm.network
279
	mq.p = p
Jeromy's avatar
Jeromy committed
280
	mq.refcnt = 1
281 282 283 284

	return mq
}

Jeromy's avatar
Jeromy committed
285
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
286
	mq.outlk.Lock()
287
	defer func() {
Jeromy's avatar
Jeromy committed
288
		mq.outlk.Unlock()
289 290 291 292 293 294
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

Jeromy's avatar
Jeromy committed
295 296
	// if we have no message held, or the one we are given is full
	// overwrite the one we are holding
Jeromy's avatar
Jeromy committed
297
	if mq.out == nil {
298
		mq.out = bsmsg.New(false)
299 300 301
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
302 303
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
304
	for _, e := range entries {
305
		if e.Cancel {
Jeromy's avatar
Jeromy committed
306
			mq.out.Cancel(e.Key)
307
		} else {
Jeromy's avatar
Jeromy committed
308
			mq.out.AddEntry(e.Key, e.Priority)
309 310 311
		}
	}
}