sessionmanager.go 4.2 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5
	"sync"
6
	"time"
7

8
	cid "github.com/ipfs/go-cid"
9
	delay "github.com/ipfs/go-ipfs-delay"
10

11 12 13 14
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
	notifications "github.com/ipfs/go-bitswap/internal/notifications"
	bssession "github.com/ipfs/go-bitswap/internal/session"
	bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
15
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
Raúl Kripalani's avatar
Raúl Kripalani committed
16
	peer "github.com/libp2p/go-libp2p-core/peer"
17 18
)

19 20 21
// Session is a session that is managed by the session manager
type Session interface {
	exchange.Fetcher
dirkmc's avatar
dirkmc committed
22 23
	ID() uint64
	ReceiveFrom(peer.ID, []cid.Cid, []cid.Cid, []cid.Cid)
24 25
}

26
// SessionFactory generates a new session for the SessionManager to track.
dirkmc's avatar
dirkmc committed
27
type SessionFactory func(ctx context.Context, id uint64, sprm bssession.SessionPeerManager, sim *bssim.SessionInterestManager, pm bssession.PeerManager, bpm *bsbpm.BlockPresenceManager, notif notifications.PubSub, provSearchDelay time.Duration, rebroadcastDelay delay.D, self peer.ID) Session
28 29

// PeerManagerFactory generates a new peer manager for a session.
dirkmc's avatar
dirkmc committed
30
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.SessionPeerManager
31

32 33
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
34
type SessionManager struct {
35 36
	ctx                    context.Context
	sessionFactory         SessionFactory
dirkmc's avatar
dirkmc committed
37
	sessionInterestManager *bssim.SessionInterestManager
38
	peerManagerFactory     PeerManagerFactory
dirkmc's avatar
dirkmc committed
39 40
	blockPresenceManager   *bsbpm.BlockPresenceManager
	peerManager            bssession.PeerManager
41
	notif                  notifications.PubSub
42

43
	// Sessions
44
	sessLk   sync.RWMutex
dirkmc's avatar
dirkmc committed
45
	sessions map[uint64]Session
46 47 48 49

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
dirkmc's avatar
dirkmc committed
50 51

	self peer.ID
52 53
}

54
// New creates a new SessionManager.
dirkmc's avatar
dirkmc committed
55 56
func New(ctx context.Context, sessionFactory SessionFactory, sessionInterestManager *bssim.SessionInterestManager, peerManagerFactory PeerManagerFactory,
	blockPresenceManager *bsbpm.BlockPresenceManager, peerManager bssession.PeerManager, notif notifications.PubSub, self peer.ID) *SessionManager {
57
	return &SessionManager{
58 59
		ctx:                    ctx,
		sessionFactory:         sessionFactory,
dirkmc's avatar
dirkmc committed
60
		sessionInterestManager: sessionInterestManager,
61
		peerManagerFactory:     peerManagerFactory,
dirkmc's avatar
dirkmc committed
62 63
		blockPresenceManager:   blockPresenceManager,
		peerManager:            peerManager,
64
		notif:                  notif,
dirkmc's avatar
dirkmc committed
65 66
		sessions:               make(map[uint64]Session),
		self:                   self,
67
	}
68 69
}

70 71
// NewSession initializes a session with the given context, and adds to the
// session manager.
72 73 74
func (sm *SessionManager) NewSession(ctx context.Context,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) exchange.Fetcher {
75 76 77
	id := sm.GetNextSessionID()
	sessionctx, cancel := context.WithCancel(ctx)

78
	pm := sm.peerManagerFactory(sessionctx, id)
dirkmc's avatar
dirkmc committed
79
	session := sm.sessionFactory(sessionctx, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)
80
	sm.sessLk.Lock()
dirkmc's avatar
dirkmc committed
81
	sm.sessions[id] = session
82
	sm.sessLk.Unlock()
83
	go func() {
hannahhoward's avatar
hannahhoward committed
84 85 86
		defer cancel()
		select {
		case <-sm.ctx.Done():
dirkmc's avatar
dirkmc committed
87
			sm.removeSession(id)
hannahhoward's avatar
hannahhoward committed
88
		case <-ctx.Done():
dirkmc's avatar
dirkmc committed
89
			sm.removeSession(id)
90 91 92 93
		}
	}()

	return session
94 95
}

dirkmc's avatar
dirkmc committed
96
func (sm *SessionManager) removeSession(sesid uint64) {
97 98
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
dirkmc's avatar
dirkmc committed
99 100

	delete(sm.sessions, sesid)
101 102
}

dirkmc's avatar
dirkmc committed
103
// GetNextSessionID returns the next sequential identifier for a session.
104 105 106
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
dirkmc's avatar
dirkmc committed
107

108 109 110 111
	sm.sessID++
	return sm.sessID
}

Dirk McCormick's avatar
Dirk McCormick committed
112 113 114 115
// ReceiveFrom is called when a new message is received
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
	// Record block presence for HAVE / DONT_HAVE
	sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)
116

dirkmc's avatar
dirkmc committed
117 118 119 120 121
	// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
	for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
		sm.sessLk.RLock()
		sess, ok := sm.sessions[id]
		sm.sessLk.RUnlock()
122

dirkmc's avatar
dirkmc committed
123 124
		if ok {
			sess.ReceiveFrom(p, blks, haves, dontHaves)
125 126
		}
	}
dirkmc's avatar
dirkmc committed
127

Dirk McCormick's avatar
Dirk McCormick committed
128 129
	// Send CANCEL to all peers with want-have / want-block
	sm.peerManager.SendCancels(ctx, blks)
130
}