wantmanager.go 6.73 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package wantmanager

import (
	"context"
	"math"

	bsmsg "github.com/ipfs/go-bitswap/message"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"

	cid "github.com/ipfs/go-cid"
	metrics "github.com/ipfs/go-metrics-interface"
	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

const (
19 20
	// maxPriority is the max priority as defined by the bitswap protocol
	maxPriority = math.MaxInt32
21 22
)

23
// PeerHandler sends changes out to the network as they get added to the wantlist
24
// managed by the WantManager.
25 26
type PeerHandler interface {
	Disconnected(p peer.ID)
27
	Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist)
28
	SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
29 30
}

31 32
type wantMessage interface {
	handle(wm *WantManager)
33 34 35 36
}

// WantManager manages a global want list. It tracks two seperate want lists -
// one for all wants, and one for wants that are specifically broadcast to the
37
// internet.
38
type WantManager struct {
39 40 41
	// channel requests to the run loop
	// to get predictable behavior while running this in a go routine
	// having only one channel is neccesary, so requests are processed serially
42
	wantMessages chan wantMessage
43 44

	// synchronized by Run loop, only touch inside there
45 46
	wl   *wantlist.SessionTrackedWantlist
	bcwl *wantlist.SessionTrackedWantlist
47

48 49
	ctx    context.Context
	cancel func()
50

51
	peerHandler   PeerHandler
52 53 54
	wantlistGauge metrics.Gauge
}

55
// New initializes a new WantManager for a given context.
56
func New(ctx context.Context) *WantManager {
57 58 59 60
	ctx, cancel := context.WithCancel(ctx)
	wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
		"Number of items in wantlist.").Gauge()
	return &WantManager{
61
		wantMessages:  make(chan wantMessage, 10),
62 63
		wl:            wantlist.NewSessionTrackedWantlist(),
		bcwl:          wantlist.NewSessionTrackedWantlist(),
64 65 66 67 68 69
		ctx:           ctx,
		cancel:        cancel,
		wantlistGauge: wantlistGauge,
	}
}

70
// SetDelegate specifies who will send want changes out to the internet.
71 72
func (wm *WantManager) SetDelegate(peerHandler PeerHandler) {
	wm.peerHandler = peerHandler
73 74
}

75
// WantBlocks adds the given cids to the wantlist, tracked by the given session.
76 77 78 79 80
func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
	log.Infof("want blocks: %s", ks)
	wm.addEntries(ctx, ks, peers, false, ses)
}

81
// CancelWants removes the given cids from the wantlist, tracked by the given session.
82 83 84 85
func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
	wm.addEntries(context.Background(), ks, peers, true, ses)
}

86
// IsWanted returns whether a CID is currently wanted.
87
func (wm *WantManager) IsWanted(c cid.Cid) bool {
88 89 90 91 92 93 94 95 96 97 98 99
	resp := make(chan bool, 1)
	select {
	case wm.wantMessages <- &isWantedMessage{c, resp}:
	case <-wm.ctx.Done():
		return false
	}
	select {
	case wanted := <-resp:
		return wanted
	case <-wm.ctx.Done():
		return false
	}
100 101
}

102
// CurrentWants returns the list of current wants.
103
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
104 105 106 107 108 109 110 111 112 113 114 115
	resp := make(chan []*wantlist.Entry, 1)
	select {
	case wm.wantMessages <- &currentWantsMessage{resp}:
	case <-wm.ctx.Done():
		return nil
	}
	select {
	case wantlist := <-resp:
		return wantlist
	case <-wm.ctx.Done():
		return nil
	}
116 117
}

118
// CurrentBroadcastWants returns the current list of wants that are broadcasts.
119
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
120 121 122 123 124 125 126 127 128 129 130 131
	resp := make(chan []*wantlist.Entry, 1)
	select {
	case wm.wantMessages <- &currentBroadcastWantsMessage{resp}:
	case <-wm.ctx.Done():
		return nil
	}
	select {
	case wl := <-resp:
		return wl
	case <-wm.ctx.Done():
		return nil
	}
132 133
}

134
// WantCount returns the total count of wants.
135
func (wm *WantManager) WantCount() int {
136 137 138 139 140 141 142 143 144 145 146 147
	resp := make(chan int, 1)
	select {
	case wm.wantMessages <- &wantCountMessage{resp}:
	case <-wm.ctx.Done():
		return 0
	}
	select {
	case count := <-resp:
		return count
	case <-wm.ctx.Done():
		return 0
	}
148 149
}

150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
// Connected is called when a new peer is connected
func (wm *WantManager) Connected(p peer.ID) {
	select {
	case wm.wantMessages <- &connectedMessage{p}:
	case <-wm.ctx.Done():
	}
}

// Disconnected is called when a peer is disconnected
func (wm *WantManager) Disconnected(p peer.ID) {
	select {
	case wm.wantMessages <- &disconnectedMessage{p}:
	case <-wm.ctx.Done():
	}
}

166
// Startup starts processing for the WantManager.
167 168
func (wm *WantManager) Startup() {
	go wm.run()
169 170
}

171
// Shutdown ends processing for the want manager.
172 173
func (wm *WantManager) Shutdown() {
	wm.cancel()
174 175
}

176
func (wm *WantManager) run() {
177 178 179 180
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	for {
		select {
181 182
		case message := <-wm.wantMessages:
			message.handle(wm)
183 184 185 186 187 188
		case <-wm.ctx.Done():
			return
		}
	}
}

189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
	entries := make([]*bsmsg.Entry, 0, len(ks))
	for i, k := range ks {
		entries = append(entries, &bsmsg.Entry{
			Cancel: cancel,
			Entry:  wantlist.NewRefEntry(k, maxPriority-i),
		})
	}
	select {
	case wm.wantMessages <- &wantSet{entries: entries, targets: targets, from: ses}:
	case <-wm.ctx.Done():
	case <-ctx.Done():
	}
}

type wantSet struct {
	entries []*bsmsg.Entry
	targets []peer.ID
	from    uint64
}

func (ws *wantSet) handle(wm *WantManager) {
	// is this a broadcast or not?
	brdc := len(ws.targets) == 0

	// add changes to our wantlist
	for _, e := range ws.entries {
		if e.Cancel {
			if brdc {
				wm.bcwl.Remove(e.Cid, ws.from)
219
			}
220

221 222 223 224 225 226 227 228 229 230 231
			if wm.wl.Remove(e.Cid, ws.from) {
				wm.wantlistGauge.Dec()
			}
		} else {
			if brdc {
				wm.bcwl.AddEntry(e.Entry, ws.from)
			}
			if wm.wl.AddEntry(e.Entry, ws.from) {
				wm.wantlistGauge.Inc()
			}
		}
232
	}
233 234

	// broadcast those wantlist changes
235
	wm.peerHandler.SendMessage(ws.entries, ws.targets, ws.from)
236 237
}

238 239 240
type isWantedMessage struct {
	c    cid.Cid
	resp chan<- bool
241 242
}

243 244 245
func (iwm *isWantedMessage) handle(wm *WantManager) {
	_, isWanted := wm.wl.Contains(iwm.c)
	iwm.resp <- isWanted
246 247
}

248 249
type currentWantsMessage struct {
	resp chan<- []*wantlist.Entry
250 251
}

252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
func (cwm *currentWantsMessage) handle(wm *WantManager) {
	cwm.resp <- wm.wl.Entries()
}

type currentBroadcastWantsMessage struct {
	resp chan<- []*wantlist.Entry
}

func (cbcwm *currentBroadcastWantsMessage) handle(wm *WantManager) {
	cbcwm.resp <- wm.bcwl.Entries()
}

type wantCountMessage struct {
	resp chan<- int
}

func (wcm *wantCountMessage) handle(wm *WantManager) {
	wcm.resp <- wm.wl.Len()
270
}
271 272 273 274 275 276

type connectedMessage struct {
	p peer.ID
}

func (cm *connectedMessage) handle(wm *WantManager) {
277
	wm.peerHandler.Connected(cm.p, wm.bcwl)
278 279 280 281 282 283 284 285 286
}

type disconnectedMessage struct {
	p peer.ID
}

func (dm *disconnectedMessage) handle(wm *WantManager) {
	wm.peerHandler.Disconnected(dm.p)
}