wantmanager.go 4.75 KB
Newer Older
1 2 3 4 5 6 7 8 9
package bitswap

import (
	"sync"

	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
	bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
10
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
11 12 13 14
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	u "github.com/ipfs/go-ipfs/util"
)

15
type WantManager struct {
16 17
	receiver bsnet.Receiver

18 19 20 21 22 23
	incoming chan []*bsmsg.Entry

	// notification channel for new peers connecting
	connect chan peer.ID

	// notification channel for peers disconnecting
24 25 26 27
	disconnect chan peer.ID

	peers map[peer.ID]*msgQueue

28 29
	wl *wantlist.Wantlist

30
	network bsnet.BitSwapNetwork
31 32

	ctx context.Context
33 34
}

35
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
36 37
	return &WantManager{
		incoming:   make(chan []*bsmsg.Entry, 10),
38 39 40
		connect:    make(chan peer.ID, 10),
		disconnect: make(chan peer.ID, 10),
		peers:      make(map[peer.ID]*msgQueue),
41
		wl:         wantlist.New(),
42
		network:    network,
43
		ctx:        ctx,
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	}
}

type msgPair struct {
	to  peer.ID
	msg bsmsg.BitSwapMessage
}

type cancellation struct {
	who peer.ID
	blk u.Key
}

type msgQueue struct {
	p peer.ID

Jeromy's avatar
Jeromy committed
60 61
	outlk sync.Mutex
	out   bsmsg.BitSwapMessage
62 63 64 65 66

	work chan struct{}
	done chan struct{}
}

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
func (pm *WantManager) WantBlocks(ks []u.Key) {
	pm.addEntries(ks, false)
}

func (pm *WantManager) CancelWants(ks []u.Key) {
	pm.addEntries(ks, true)
}

func (pm *WantManager) addEntries(ks []u.Key, cancel bool) {
	var entries []*bsmsg.Entry
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
			Entry: wantlist.Entry{
				Key:      k,
				Priority: kMaxPriority - i,
			},
		})
	}
86 87 88 89
	select {
	case pm.incoming <- entries:
	case <-pm.ctx.Done():
	}
90 91 92
}

func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
93 94 95 96 97 98
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

	msg := bsmsg.New()
	msg.AddBlock(env.Block)
99
	msg.SetFull(false)
Jeromy's avatar
Jeromy committed
100
	err := pm.network.SendMessage(ctx, env.Peer, msg)
101 102 103 104 105
	if err != nil {
		log.Error(err)
	}
}

106
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
107 108 109
	_, ok := pm.peers[p]
	if ok {
		// TODO: log an error?
Jeromy's avatar
Jeromy committed
110
		return nil
111 112
	}

113 114 115 116 117 118 119 120 121 122
	mq := newMsgQueue(p)

	// new peer, we will want to give them our full wantlist
	fullwantlist := bsmsg.New()
	for _, e := range pm.wl.Entries() {
		fullwantlist.AddEntry(e.Key, e.Priority)
	}
	fullwantlist.SetFull(true)
	mq.out = fullwantlist
	mq.work <- struct{}{}
123 124

	pm.peers[p] = mq
125
	go pm.runQueue(mq)
Jeromy's avatar
Jeromy committed
126
	return mq
127 128
}

129
func (pm *WantManager) stopPeerHandler(p peer.ID) {
130 131 132 133 134 135 136 137 138 139
	pq, ok := pm.peers[p]
	if !ok {
		// TODO: log error?
		return
	}

	close(pq.done)
	delete(pm.peers, p)
}

140
func (pm *WantManager) runQueue(mq *msgQueue) {
141 142 143 144
	for {
		select {
		case <-mq.work: // there is work to be done

145
			err := pm.network.ConnectTo(pm.ctx, mq.p)
146 147 148 149 150
			if err != nil {
				log.Error(err)
				// TODO: cant connect, what now?
			}

151
			// grab outgoing message
Jeromy's avatar
Jeromy committed
152 153
			mq.outlk.Lock()
			wlm := mq.out
Jeromy's avatar
Jeromy committed
154 155
			if wlm == nil || wlm.Empty() {
				mq.outlk.Unlock()
156 157
				continue
			}
Jeromy's avatar
Jeromy committed
158 159
			mq.out = nil
			mq.outlk.Unlock()
160 161

			// send wantlist updates
162
			err = pm.network.SendMessage(pm.ctx, mq.p, wlm)
163 164 165
			if err != nil {
				log.Error("bitswap send error: ", err)
				// TODO: what do we do if this fails?
166 167 168 169 170 171 172
			}
		case <-mq.done:
			return
		}
	}
}

173
func (pm *WantManager) Connected(p peer.ID) {
174 175 176
	pm.connect <- p
}

177
func (pm *WantManager) Disconnected(p peer.ID) {
178 179 180 181
	pm.disconnect <- p
}

// TODO: use goprocess here once i trust it
182
func (pm *WantManager) Run() {
183 184
	for {
		select {
185 186 187 188 189 190 191 192
		case entries := <-pm.incoming:

			// add changes to our wantlist
			for _, e := range entries {
				if e.Cancel {
					pm.wl.Remove(e.Key)
				} else {
					pm.wl.Add(e.Key, e.Priority)
193 194 195
				}
			}

196 197
			// broadcast those wantlist changes
			for _, p := range pm.peers {
Jeromy's avatar
Jeromy committed
198
				p.addMessage(entries)
199 200 201
			}

		case p := <-pm.connect:
202
			pm.startPeerHandler(p)
203 204
		case p := <-pm.disconnect:
			pm.stopPeerHandler(p)
205
		case <-pm.ctx.Done():
206 207 208 209 210
			return
		}
	}
}

211 212 213 214 215 216 217 218 219
func newMsgQueue(p peer.ID) *msgQueue {
	mq := new(msgQueue)
	mq.done = make(chan struct{})
	mq.work = make(chan struct{}, 1)
	mq.p = p

	return mq
}

Jeromy's avatar
Jeromy committed
220
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Jeromy's avatar
Jeromy committed
221
	mq.outlk.Lock()
222
	defer func() {
Jeromy's avatar
Jeromy committed
223
		mq.outlk.Unlock()
224 225 226 227 228 229
		select {
		case mq.work <- struct{}{}:
		default:
		}
	}()

Jeromy's avatar
Jeromy committed
230 231
	// if we have no message held, or the one we are given is full
	// overwrite the one we are holding
Jeromy's avatar
Jeromy committed
232 233
	if mq.out == nil {
		mq.out = bsmsg.New()
234 235 236
	}

	// TODO: add a msg.Combine(...) method
Jeromy's avatar
Jeromy committed
237 238
	// otherwise, combine the one we are holding with the
	// one passed in
Jeromy's avatar
Jeromy committed
239
	for _, e := range entries {
240
		if e.Cancel {
Jeromy's avatar
Jeromy committed
241
			mq.out.Cancel(e.Key)
242
		} else {
Jeromy's avatar
Jeromy committed
243
			mq.out.AddEntry(e.Key, e.Priority)
244 245 246
		}
	}
}