wantmanager.go 4 KB
Newer Older
1 2 3 4 5
package wantmanager

import (
	"context"

6 7 8 9
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
	"github.com/ipfs/go-bitswap/internal/sessionmanager"
	bsswl "github.com/ipfs/go-bitswap/internal/sessionwantlist"
Dirk McCormick's avatar
Dirk McCormick committed
10
	logging "github.com/ipfs/go-log"
11 12

	cid "github.com/ipfs/go-cid"
Raúl Kripalani's avatar
Raúl Kripalani committed
13
	peer "github.com/libp2p/go-libp2p-core/peer"
14 15
)

Dirk McCormick's avatar
Dirk McCormick committed
16 17
var log = logging.Logger("bitswap")

dirkmc's avatar
dirkmc committed
18
// PeerHandler sends wants / cancels to other peers
19
type PeerHandler interface {
dirkmc's avatar
dirkmc committed
20 21 22 23
	// Connected is called when a peer connects, with any initial want-haves
	// that have been broadcast to all peers (as part of session discovery)
	Connected(p peer.ID, initialWants []cid.Cid)
	// Disconnected is called when a peer disconnects
24
	Disconnected(p peer.ID)
dirkmc's avatar
dirkmc committed
25 26 27 28 29
	// BroadcastWantHaves sends want-haves to all connected peers
	BroadcastWantHaves(ctx context.Context, wantHaves []cid.Cid)
	// SendCancels sends cancels to all peers that had previously been sent
	// a want-block or want-have for the given key
	SendCancels(context.Context, []cid.Cid)
30 31
}

dirkmc's avatar
dirkmc committed
32 33 34
// SessionManager receives incoming messages and distributes them to sessions
type SessionManager interface {
	ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []sessionmanager.Session
35 36
}

dirkmc's avatar
dirkmc committed
37 38 39 40 41 42
// WantManager
// - informs the SessionManager and BlockPresenceManager of incoming information
//   and cancelled sessions
// - informs the PeerManager of connects and disconnects
// - manages the list of want-haves that are broadcast to the internet
//   (as opposed to being sent to specific peers)
43
type WantManager struct {
dirkmc's avatar
dirkmc committed
44
	bcwl *bsswl.SessionWantlist
45

dirkmc's avatar
dirkmc committed
46 47 48 49
	peerHandler PeerHandler
	sim         *bssim.SessionInterestManager
	bpm         *bsbpm.BlockPresenceManager
	sm          SessionManager
50 51
}

52
// New initializes a new WantManager for a given context.
dirkmc's avatar
dirkmc committed
53
func New(ctx context.Context, peerHandler PeerHandler, sim *bssim.SessionInterestManager, bpm *bsbpm.BlockPresenceManager) *WantManager {
54
	return &WantManager{
dirkmc's avatar
dirkmc committed
55 56 57 58
		bcwl:        bsswl.NewSessionWantlist(),
		peerHandler: peerHandler,
		sim:         sim,
		bpm:         bpm,
59 60 61
	}
}

dirkmc's avatar
dirkmc committed
62 63
func (wm *WantManager) SetSessionManager(sm SessionManager) {
	wm.sm = sm
64 65
}

dirkmc's avatar
dirkmc committed
66 67 68 69 70 71 72 73 74 75
// ReceiveFrom is called when a new message is received
func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
	// Record block presence for HAVE / DONT_HAVE
	wm.bpm.ReceiveFrom(p, haves, dontHaves)
	// Inform interested sessions
	wm.sm.ReceiveFrom(p, blks, haves, dontHaves)
	// Remove received blocks from broadcast wantlist
	wm.bcwl.RemoveKeys(blks)
	// Send CANCEL to all peers with want-have / want-block
	wm.peerHandler.SendCancels(ctx, blks)
76 77
}

dirkmc's avatar
dirkmc committed
78 79 80
// BroadcastWantHaves is called when want-haves should be broadcast to all
// connected peers (as part of session discovery)
func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantHaves []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
81
	log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves)
82

dirkmc's avatar
dirkmc committed
83 84
	// Record broadcast wants
	wm.bcwl.Add(wantHaves, ses)
85

dirkmc's avatar
dirkmc committed
86 87
	// Send want-haves to all peers
	wm.peerHandler.BroadcastWantHaves(ctx, wantHaves)
88 89
}

dirkmc's avatar
dirkmc committed
90 91
// RemoveSession is called when the session is shut down
func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {
92
	// Remove session's interest in the given blocks.
dirkmc's avatar
dirkmc committed
93
	cancelKs := wm.sim.RemoveSessionInterest(ses)
94

dirkmc's avatar
dirkmc committed
95 96
	// Remove broadcast want-haves for session
	wm.bcwl.RemoveSession(ses)
97

dirkmc's avatar
dirkmc committed
98 99 100
	// Free up block presence tracking for keys that no session is interested
	// in anymore
	wm.bpm.RemoveKeys(cancelKs)
101

dirkmc's avatar
dirkmc committed
102 103
	// Send CANCEL to all peers for blocks that no session is interested in anymore
	wm.peerHandler.SendCancels(ctx, cancelKs)
104 105
}

dirkmc's avatar
dirkmc committed
106 107 108 109 110
// Connected is called when a new peer connects
func (wm *WantManager) Connected(p peer.ID) {
	// Tell the peer handler that there is a new connection and give it the
	// list of outstanding broadcast wants
	wm.peerHandler.Connected(p, wm.bcwl.Keys())
111 112
}

dirkmc's avatar
dirkmc committed
113 114 115
// Disconnected is called when a peer disconnects
func (wm *WantManager) Disconnected(p peer.ID) {
	wm.peerHandler.Disconnected(p)
116
}