sessionmanager.go 4.13 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5
	"sync"
6
	"time"
7

8
	cid "github.com/ipfs/go-cid"
9
	delay "github.com/ipfs/go-ipfs-delay"
10

11
	notifications "github.com/ipfs/go-bitswap/notifications"
12
	bssession "github.com/ipfs/go-bitswap/session"
13
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
15 16
)

17 18 19 20
// Session is a session that is managed by the session manager
type Session interface {
	exchange.Fetcher
	InterestedIn(cid.Cid) bool
21
	ReceiveFrom(peer.ID, []cid.Cid)
22
	IsWanted(cid.Cid) bool
23 24
}

25
type sesTrk struct {
26 27
	session Session
	pm      bssession.PeerManager
28
	srs     bssession.RequestSplitter
29 30
}

31
// SessionFactory generates a new session for the SessionManager to track.
32
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, notif notifications.PubSub, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session
33 34 35

// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
36 37 38 39

// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager

40 41
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
42
type SessionManager struct {
43 44 45 46
	ctx                    context.Context
	sessionFactory         SessionFactory
	peerManagerFactory     PeerManagerFactory
	requestSplitterFactory RequestSplitterFactory
47
	notif                  notifications.PubSub
48

49 50
	// Sessions
	sessLk   sync.Mutex
51
	sessions []sesTrk
52 53 54 55 56 57

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
}

58
// New creates a new SessionManager.
59 60
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory,
	requestSplitterFactory RequestSplitterFactory, notif notifications.PubSub) *SessionManager {
61
	return &SessionManager{
62 63 64 65
		ctx:                    ctx,
		sessionFactory:         sessionFactory,
		peerManagerFactory:     peerManagerFactory,
		requestSplitterFactory: requestSplitterFactory,
66
		notif:                  notif,
67
	}
68 69
}

70 71
// NewSession initializes a session with the given context, and adds to the
// session manager.
72 73 74
func (sm *SessionManager) NewSession(ctx context.Context,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) exchange.Fetcher {
75 76 77
	id := sm.GetNextSessionID()
	sessionctx, cancel := context.WithCancel(ctx)

78
	pm := sm.peerManagerFactory(sessionctx, id)
79
	srs := sm.requestSplitterFactory(sessionctx)
80
	session := sm.sessionFactory(sessionctx, id, pm, srs, sm.notif, provSearchDelay, rebroadcastDelay)
81
	tracked := sesTrk{session, pm, srs}
82
	sm.sessLk.Lock()
83
	sm.sessions = append(sm.sessions, tracked)
84
	sm.sessLk.Unlock()
85
	go func() {
hannahhoward's avatar
hannahhoward committed
86 87 88
		defer cancel()
		select {
		case <-sm.ctx.Done():
89
			sm.removeSession(tracked)
hannahhoward's avatar
hannahhoward committed
90
		case <-ctx.Done():
91
			sm.removeSession(tracked)
92 93 94 95
		}
	}()

	return session
96 97
}

98
func (sm *SessionManager) removeSession(session sesTrk) {
99 100 101 102 103 104 105 106 107 108 109
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
	for i := 0; i < len(sm.sessions); i++ {
		if sm.sessions[i] == session {
			sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
			sm.sessions = sm.sessions[:len(sm.sessions)-1]
			return
		}
	}
}

110
// GetNextSessionID returns the next sequentional identifier for a session.
111 112 113 114 115 116 117
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
	sm.sessID++
	return sm.sessID
}

118
// ReceiveFrom receives blocks from a peer and dispatches to interested
119
// sessions.
120
func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
121 122 123
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

124
	// Only give each session the blocks / dups that it is interested in
125
	for _, s := range sm.sessions {
126 127 128 129
		sessKs := make([]cid.Cid, 0, len(ks))
		for _, k := range ks {
			if s.session.InterestedIn(k) {
				sessKs = append(sessKs, k)
130
			}
131
		}
132
		s.session.ReceiveFrom(from, sessKs)
133 134
	}
}
135

136
// IsWanted indicates whether any of the sessions are waiting to receive
137
// the block with the given CID.
138
func (sm *SessionManager) IsWanted(cid cid.Cid) bool {
139 140 141 142
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

	for _, s := range sm.sessions {
143
		if s.session.IsWanted(cid) {
144 145 146 147 148
			return true
		}
	}
	return false
}