wantmanager.go 5.73 KB
Newer Older
1 2 3 4
package bitswap

import (
	"sync"
5
	"time"
6 7

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
8
	key "github.com/ipfs/go-ipfs/blocks/key"
9 10 11
	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
12
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
13 14 15
	peer "github.com/ipfs/go-ipfs/p2p/peer"
)

16
type WantManager struct {
Jeromy's avatar
Jeromy committed
17 18 19 20
	// sync channels for Run loop
	incoming   chan []*bsmsg.Entry
	connect    chan peer.ID // notification channel for new peers connecting
	disconnect chan peer.ID // notification channel for peers disconnecting
21

Jeromy's avatar
Jeromy committed
22
	// synchronized by Run loop, only touch inside there
23
	peers map[peer.ID]*msgQueue
24
	wl    *wantlist.ThreadSafe
25

26
	network bsnet.BitSwapNetwork
Jeromy's avatar
Jeromy committed
27
	ctx     context.Context
28 29
}

30
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
31 32
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
33 34 35
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
		peers:      make(map[peer.ID]*msgQueue),
36
		wl:         wantlist.NewThreadSafe(),
37
		network:    network,
38
		ctx:        ctx,
39 40 41 42 43 44 45 46 47 48
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
49
	blk key.Key
50 51 52 53 54
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
55 56 57
	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
	network bsnet.BitSwapNetwork
58 59 60 61 62

	work chan struct{}
	done chan struct{}
}

63
func (pm *WantManager) WantBlocks(ks []key.Key) {
Jeromy's avatar
Jeromy committed
64
	log.Infof("want blocks: %s", ks)
65 66 67
	pm.addEntries(ks, false)
}

68
func (pm *WantManager) CancelWants(ks []key.Key) {
69 70 71
	pm.addEntries(ks, true)
}

72
func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
73 74 75 76 77 78 79 80 81 82
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
			Entry: wantlist.Entry{
				Key:      k,
				Priority: kMaxPriority - i,
			},
		})
	}
83 84 85 86
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
	}
87 88 89
}

func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
90 91 92 93
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

94
	msg := bsmsg.New(false)
95
	msg.AddBlock(env.Block)
Jeromy's avatar
Jeromy committed
96
	log.Infof("Sending block %s to %s", env.Peer, env.Block)
Jeromy's avatar
Jeromy committed
97
	err := pm.network.SendMessage(ctx, env.Peer, msg)
98
	if err != nil {
Jeromy's avatar
Jeromy committed
99
		log.Noticef("sendblock error: %s", err)
100 101 102
	}
}

103
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
104 105 106
	_, ok := pm.peers[p]
	if ok {
		// TODO: log an error?
Jeromy's avatar
Jeromy committed
107
		return nil
108 109
	}

Jeromy's avatar
Jeromy committed
110
	mq := pm.newMsgQueue(p)
111 112

	// new peer, we will want to give them our full wantlist
113
	fullwantlist := bsmsg.New(true)
114 115 116 117 118
	for _, e := range pm.wl.Entries() {
		fullwantlist.AddEntry(e.Key, e.Priority)
	}
	mq.out = fullwantlist
	mq.work <- struct{}{}
119 120

	pm.peers[p] = mq
Jeromy's avatar
Jeromy committed
121
	go mq.runQueue(pm.ctx)
Jeromy's avatar
Jeromy committed
122
	return mq
123 124
}

125
func (pm *WantManager) stopPeerHandler(p peer.ID) {
126 127 128 129 130 131 132 133 134 135
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

	close(pq.done)
	delete(pm.peers, p)
}

Jeromy's avatar
Jeromy committed
136
func (mq *msgQueue) runQueue(ctx context.Context) {
137 138 139
	for {
		select {
		case <-mq.work: // there is work to be done
140
			mq.doWork(ctx)
141 142 143 144 145 146
		case <-mq.done:
			return
		}
	}
}

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
func (mq *msgQueue) doWork(ctx context.Context) {
	// allow a minute for connections
	// this includes looking them up in the dht
	// dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute)
	defer cancel()

	err := mq.network.ConnectTo(conctx, mq.p)
	if err != nil {
		log.Noticef("cant connect to peer %s: %s", mq.p, err)
		// TODO: cant connect, what now?
		return
	}

	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	mq.out = nil
	mq.outlk.Unlock()

	if wlm == nil || wlm.Empty() {
		return
	}

	sendctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	// send wantlist updates
	err = mq.network.SendMessage(sendctx, mq.p, wlm)
	if err != nil {
		log.Noticef("bitswap send error: %s", err)
		// TODO: what do we do if this fails?
		return
	}
}

183
func (pm *WantManager) Connected(p peer.ID) {
184 185 186
	pm.connect <- p
}

187
func (pm *WantManager) Disconnected(p peer.ID) {
188 189 190 191
	pm.disconnect <- p
}

// TODO: use goprocess here once i trust it
192
func (pm *WantManager) Run() {
193
	tock := time.NewTicker(rebroadcastDelay.Get())
Jeromy's avatar
Jeromy committed
194
	defer tock.Stop()
195 196
	for {
		select {
197 198 199 200 201 202 203 204
		case entries := <-pm.incoming:

			// add changes to our wantlist
			for _, e := range entries {
				if e.Cancel {
					pm.wl.Remove(e.Key)
				} else {
					pm.wl.Add(e.Key, e.Priority)
205 206 207
				}
			}

208 209
			// broadcast those wantlist changes
			for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
210
				p.addMessage(entries)
211 212
			}

213 214 215 216 217 218 219 220 221 222 223 224 225
		case <-tock.C:
			// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
			var es []*bsmsg.Entry
			for _, e := range pm.wl.Entries() {
				es = append(es, &bsmsg.Entry{Entry: e})
			}
			for _, p := range pm.peers {
				p.outlk.Lock()
				p.out = bsmsg.New(true)
				p.outlk.Unlock()

				p.addMessage(es)
			}
226
		case p := <-pm.connect:
227
			pm.startPeerHandler(p)
228 229
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
230
		case <-pm.ctx.Done():
231 232 233 234 235
			return
		}
	}
}

Jeromy's avatar
Jeromy committed
236
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
237 238 239
	mq := new(msgQueue)
	mq.done = make(chan struct{})
	mq.work = make(chan struct{}, 1)
Jeromy's avatar
Jeromy committed
240
	mq.network = wm.network
241 242 243 244 245
	mq.p = p

	return mq
}

Jeromy's avatar
Jeromy committed
246
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
247
	mq.outlk.Lock()
248
	defer func() {
Jeromy's avatar
Jeromy committed
249
		mq.outlk.Unlock()
250 251 252 253 254 255
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

Jeromy's avatar
Jeromy committed
256 257
	// if we have no message held, or the one we are given is full
	// overwrite the one we are holding
Jeromy's avatar
Jeromy committed
258
	if mq.out == nil {
259
		mq.out = bsmsg.New(false)
260 261 262
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
263 264
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
265
	for _, e := range entries {
266
		if e.Cancel {
Jeromy's avatar
Jeromy committed
267
			mq.out.Cancel(e.Key)
268
		} else {
Jeromy's avatar
Jeromy committed
269
			mq.out.AddEntry(e.Key, e.Priority)
270 271 272
		}
	}
}