sessionmanager.go 5.74 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5
	"sync"
6
	"time"
7

8 9
	cid "gitlab.dms3.io/dms3/go-cid"
	delay "gitlab.dms3.io/dms3/go-dms3-delay"
10

11 12 13 14 15 16
	bsbpm "gitlab.dms3.io/dms3/go-bitswap/internal/blockpresencemanager"
	notifications "gitlab.dms3.io/dms3/go-bitswap/internal/notifications"
	bssession "gitlab.dms3.io/dms3/go-bitswap/internal/session"
	bssim "gitlab.dms3.io/dms3/go-bitswap/internal/sessioninterestmanager"
	exchange "gitlab.dms3.io/dms3/go-dms3-exchange-interface"
	peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
17 18
)

19 20 21
// Session is a session that is managed by the session manager
type Session interface {
	exchange.Fetcher
dirkmc's avatar
dirkmc committed
22 23
	ID() uint64
	ReceiveFrom(peer.ID, []cid.Cid, []cid.Cid, []cid.Cid)
24
	Shutdown()
25 26
}

27
// SessionFactory generates a new session for the SessionManager to track.
28 29 30 31 32 33 34 35 36 37 38 39
type SessionFactory func(
	ctx context.Context,
	sm bssession.SessionManager,
	id uint64,
	sprm bssession.SessionPeerManager,
	sim *bssim.SessionInterestManager,
	pm bssession.PeerManager,
	bpm *bsbpm.BlockPresenceManager,
	notif notifications.PubSub,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D,
	self peer.ID) Session
40 41

// PeerManagerFactory generates a new peer manager for a session.
dirkmc's avatar
dirkmc committed
42
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.SessionPeerManager
43

44 45
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
46
type SessionManager struct {
47 48
	ctx                    context.Context
	sessionFactory         SessionFactory
dirkmc's avatar
dirkmc committed
49
	sessionInterestManager *bssim.SessionInterestManager
50
	peerManagerFactory     PeerManagerFactory
dirkmc's avatar
dirkmc committed
51 52
	blockPresenceManager   *bsbpm.BlockPresenceManager
	peerManager            bssession.PeerManager
53
	notif                  notifications.PubSub
54

55
	// Sessions
56
	sessLk   sync.RWMutex
dirkmc's avatar
dirkmc committed
57
	sessions map[uint64]Session
58 59 60 61

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
dirkmc's avatar
dirkmc committed
62 63

	self peer.ID
64 65
}

66
// New creates a new SessionManager.
dirkmc's avatar
dirkmc committed
67 68
func New(ctx context.Context, sessionFactory SessionFactory, sessionInterestManager *bssim.SessionInterestManager, peerManagerFactory PeerManagerFactory,
	blockPresenceManager *bsbpm.BlockPresenceManager, peerManager bssession.PeerManager, notif notifications.PubSub, self peer.ID) *SessionManager {
69

70
	return &SessionManager{
71 72
		ctx:                    ctx,
		sessionFactory:         sessionFactory,
dirkmc's avatar
dirkmc committed
73
		sessionInterestManager: sessionInterestManager,
74
		peerManagerFactory:     peerManagerFactory,
dirkmc's avatar
dirkmc committed
75 76
		blockPresenceManager:   blockPresenceManager,
		peerManager:            peerManager,
77
		notif:                  notif,
dirkmc's avatar
dirkmc committed
78 79
		sessions:               make(map[uint64]Session),
		self:                   self,
80
	}
81 82
}

83 84
// NewSession initializes a session with the given context, and adds to the
// session manager.
85 86 87
func (sm *SessionManager) NewSession(ctx context.Context,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) exchange.Fetcher {
88 89
	id := sm.GetNextSessionID()

90 91 92
	pm := sm.peerManagerFactory(ctx, id)
	session := sm.sessionFactory(ctx, sm, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)

93
	sm.sessLk.Lock()
94 95 96
	if sm.sessions != nil { // check if SessionManager was shutdown
		sm.sessions[id] = session
	}
97
	sm.sessLk.Unlock()
98 99

	return session
100 101
}

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
func (sm *SessionManager) Shutdown() {
	sm.sessLk.Lock()

	sessions := make([]Session, 0, len(sm.sessions))
	for _, ses := range sm.sessions {
		sessions = append(sessions, ses)
	}

	// Ensure that if Shutdown() is called twice we only shut down
	// the sessions once
	sm.sessions = nil

	sm.sessLk.Unlock()

	for _, ses := range sessions {
		ses.Shutdown()
	}
}

func (sm *SessionManager) RemoveSession(sesid uint64) {
	// Remove session from SessionInterestManager - returns the keys that no
	// session is interested in anymore.
	cancelKs := sm.sessionInterestManager.RemoveSession(sesid)

	// Cancel keys that no session is interested in anymore
	sm.cancelWants(cancelKs)

129 130
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
dirkmc's avatar
dirkmc committed
131

132 133 134 135
	// Clean up session
	if sm.sessions != nil { // check if SessionManager was shutdown
		delete(sm.sessions, sesid)
	}
136 137
}

dirkmc's avatar
dirkmc committed
138
// GetNextSessionID returns the next sequential identifier for a session.
139 140 141
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
dirkmc's avatar
dirkmc committed
142

143 144 145 146
	sm.sessID++
	return sm.sessID
}

Dirk McCormick's avatar
Dirk McCormick committed
147 148 149 150
// ReceiveFrom is called when a new message is received
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
	// Record block presence for HAVE / DONT_HAVE
	sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)
151

dirkmc's avatar
dirkmc committed
152 153 154
	// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
	for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
		sm.sessLk.RLock()
155 156 157 158
		if sm.sessions == nil { // check if SessionManager was shutdown
			sm.sessLk.RUnlock()
			return
		}
dirkmc's avatar
dirkmc committed
159 160
		sess, ok := sm.sessions[id]
		sm.sessLk.RUnlock()
161

dirkmc's avatar
dirkmc committed
162 163
		if ok {
			sess.ReceiveFrom(p, blks, haves, dontHaves)
164 165
		}
	}
dirkmc's avatar
dirkmc committed
166

Dirk McCormick's avatar
Dirk McCormick committed
167 168
	// Send CANCEL to all peers with want-have / want-block
	sm.peerManager.SendCancels(ctx, blks)
169
}
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189

// CancelSessionWants is called when a session cancels wants because a call to
// GetBlocks() is cancelled
func (sm *SessionManager) CancelSessionWants(sesid uint64, wants []cid.Cid) {
	// Remove session's interest in the given blocks - returns the keys that no
	// session is interested in anymore.
	cancelKs := sm.sessionInterestManager.RemoveSessionInterested(sesid, wants)
	sm.cancelWants(cancelKs)
}

func (sm *SessionManager) cancelWants(wants []cid.Cid) {
	// Free up block presence tracking for keys that no session is interested
	// in anymore
	sm.blockPresenceManager.RemoveKeys(wants)

	// Send CANCEL to all peers for blocks that no session is interested in
	// anymore.
	// Note: use bitswap context because session context may already be Done.
	sm.peerManager.SendCancels(sm.ctx, wants)
}