wantmanager.go 5.86 KB
Newer Older
1 2 3 4
package bitswap

import (
	"sync"
5
	"time"
6 7

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
8
	key "github.com/ipfs/go-ipfs/blocks/key"
9 10 11
	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
12
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
13 14 15
	peer "github.com/ipfs/go-ipfs/p2p/peer"
)

16
type WantManager struct {
Jeromy's avatar
Jeromy committed
17 18 19 20
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
	connect    chan peer.ID // notification channel for new peers connecting
	disconnect chan peer.ID // notification channel for peers disconnecting
21

Jeromy's avatar
Jeromy committed
22
	// synchronized by Run loop, only touch inside there
23
	peers map[peer.ID]*msgQueue
24
	wl    *wantlist.ThreadSafe
25

26
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
27
	ctx     context.Context
28 29
}

30
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
31 32
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
33 34 35
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
		peers:      make(map[peer.ID]*msgQueue),
36
		wl:         wantlist.NewThreadSafe(),
37
		network:    network,
38
		ctx:        ctx,
39 40 41 42 43 44 45 46 47 48
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
49
	blk key.Key
50 51 52 53 54
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
55 56 57
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
58 59 60 61 62

	work chan struct{}
	done chan struct{}
}

63
func (pm *WantManager) WantBlocks(ks []key.Key) {
Jeromy's avatar
Jeromy committed
64
	log.Infof("want blocks: %s", ks)
65 66 67
	pm.addEntries(ks, false)
}

68
func (pm *WantManager) CancelWants(ks []key.Key) {
69 70 71
	pm.addEntries(ks, true)
}

72
func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
73 74 75 76 77 78 79 80 81 82
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
			Entry: wantlist.Entry{
				Key:      k,
				Priority: kMaxPriority - i,
			},
		})
	}
83 84 85 86
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
	}
87 88 89
}

func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
90 91 92 93
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

94
	msg := bsmsg.New(false)
95
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
96
	log.Infof("Sending block %s to %s", env.Peer, env.Block)
Jeromy's avatar
Jeromy committed
97
	err := pm.network.SendMessage(ctx, env.Peer, msg)
98
	if err != nil {
rht's avatar
rht committed
99
		log.Infof("sendblock error: %s", err)
100 101 102
	}
}

103
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
104 105 106
	_, ok := pm.peers[p]
	if ok {
		// TODO: log an error?
Jeromy's avatar
Jeromy committed
107
		return nil
108 109
	}

Jeromy's avatar
Jeromy committed
110
	mq := pm.newMsgQueue(p)
111 112

	// new peer, we will want to give them our full wantlist
113
	fullwantlist := bsmsg.New(true)
114 115 116 117 118
	for _, e := range pm.wl.Entries() {
		fullwantlist.AddEntry(e.Key, e.Priority)
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
119 120

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
121
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
122
	return mq
123 124
}

125
func (pm *WantManager) stopPeerHandler(p peer.ID) {
126 127 128 129 130 131 132 133 134 135
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
136
func (mq *msgQueue) runQueue(ctx context.Context) {
137 138 139
	for {
		select {
		case <-mq.work: // there is work to be done
140
			mq.doWork(ctx)
141 142
		case <-mq.done:
			return
Jeromy's avatar
Jeromy committed
143 144
		case <-ctx.Done():
			return
145 146 147 148
		}
	}
}

149
func (mq *msgQueue) doWork(ctx context.Context) {
Jeromy's avatar
Jeromy committed
150
	// allow ten minutes for connections
151 152
	// this includes looking them up in the dht
	// dialing them, and handshaking
Jeromy's avatar
Jeromy committed
153
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
154 155 156 157
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
rht's avatar
rht committed
158
		log.Infof("cant connect to peer %s: %s", mq.p, err)
159 160 161 162 163 164 165 166
		// TODO: cant connect, what now?
		return
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	if wlm == nil || wlm.Empty() {
Jeromy's avatar
Jeromy committed
167
		mq.outlk.Unlock()
168 169
		return
	}
Jeromy's avatar
Jeromy committed
170 171
	mq.out = nil
	mq.outlk.Unlock()
172

Jeromy's avatar
Jeromy committed
173
	sendctx, cancel := context.WithTimeout(ctx, time.Minute*5)
174 175 176 177 178
	defer cancel()

	// send wantlist updates
	err = mq.network.SendMessage(sendctx, mq.p, wlm)
	if err != nil {
rht's avatar
rht committed
179
		log.Infof("bitswap send error: %s", err)
180 181 182 183 184
		// TODO: what do we do if this fails?
		return
	}
}

185
func (pm *WantManager) Connected(p peer.ID) {
186 187 188 189
	select {
	case pm.connect <- p:
	case <-pm.ctx.Done():
	}
190 191
}

192
func (pm *WantManager) Disconnected(p peer.ID) {
193 194 195 196
	select {
	case pm.disconnect <- p:
	case <-pm.ctx.Done():
	}
197 198 199
}

// TODO: use goprocess here once i trust it
200
func (pm *WantManager) Run() {
201
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
202
	defer tock.Stop()
203 204
	for {
		select {
205 206 207 208 209 210 211 212
		case entries := <-pm.incoming:

			// add changes to our wantlist
			for _, e := range entries {
				if e.Cancel {
					pm.wl.Remove(e.Key)
				} else {
					pm.wl.Add(e.Key, e.Priority)
213 214 215
				}
			}

216 217
			// broadcast those wantlist changes
			for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
218
				p.addMessage(entries)
219 220
			}

221 222 223 224 225 226 227 228 229 230 231 232 233
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
234
		case p := <-pm.connect:
235
			pm.startPeerHandler(p)
236 237
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
238
		case <-pm.ctx.Done():
239 240 241 242 243
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
244
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
245 246 247
	mq := new(msgQueue)
	mq.done = make(chan struct{})
	mq.work = make(chan struct{}, 1)
Jeromy's avatar
Jeromy committed
248
	mq.network = wm.network
249 250 251 252 253
	mq.p = p

	return mq
}

Jeromy's avatar
Jeromy committed
254
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
255
	mq.outlk.Lock()
256
	defer func() {
Jeromy's avatar
Jeromy committed
257
		mq.outlk.Unlock()
258 259 260 261 262 263
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

Jeromy's avatar
Jeromy committed
264 265
	// if we have no message held, or the one we are given is full
	// overwrite the one we are holding
Jeromy's avatar
Jeromy committed
266
	if mq.out == nil {
267
		mq.out = bsmsg.New(false)
268 269 270
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
271 272
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
273
	for _, e := range entries {
274
		if e.Cancel {
Jeromy's avatar
Jeromy committed
275
			mq.out.Cancel(e.Key)
276
		} else {
Jeromy's avatar
Jeromy committed
277
			mq.out.AddEntry(e.Key, e.Priority)
278 279 280
		}
	}
}