sessionmanager.go 3.71 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5
	"sync"
6
	"time"
7

8 9
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
10
	delay "github.com/ipfs/go-ipfs-delay"
11 12

	bssession "github.com/ipfs/go-bitswap/session"
13
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
Raúl Kripalani's avatar
Raúl Kripalani committed
14
	peer "github.com/libp2p/go-libp2p-core/peer"
15 16
)

17 18 19 20
// Session is a session that is managed by the session manager
type Session interface {
	exchange.Fetcher
	InterestedIn(cid.Cid) bool
21
	ReceiveBlocksFrom(peer.ID, []blocks.Block)
22 23
}

24
type sesTrk struct {
25 26
	session Session
	pm      bssession.PeerManager
27
	srs     bssession.RequestSplitter
28 29
}

30
// SessionFactory generates a new session for the SessionManager to track.
31
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session
32 33 34

// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
35 36 37 38

// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager

39 40
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
41
type SessionManager struct {
42 43 44 45 46
	ctx                    context.Context
	sessionFactory         SessionFactory
	peerManagerFactory     PeerManagerFactory
	requestSplitterFactory RequestSplitterFactory

47 48
	// Sessions
	sessLk   sync.Mutex
49
	sessions []sesTrk
50 51 52 53 54 55

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
}

56
// New creates a new SessionManager.
57
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory, requestSplitterFactory RequestSplitterFactory) *SessionManager {
58
	return &SessionManager{
59 60 61 62
		ctx:                    ctx,
		sessionFactory:         sessionFactory,
		peerManagerFactory:     peerManagerFactory,
		requestSplitterFactory: requestSplitterFactory,
63
	}
64 65
}

66 67
// NewSession initializes a session with the given context, and adds to the
// session manager.
68 69 70
func (sm *SessionManager) NewSession(ctx context.Context,
	provSearchDelay time.Duration,
	rebroadcastDelay delay.D) exchange.Fetcher {
71 72 73
	id := sm.GetNextSessionID()
	sessionctx, cancel := context.WithCancel(ctx)

74
	pm := sm.peerManagerFactory(sessionctx, id)
75
	srs := sm.requestSplitterFactory(sessionctx)
76
	session := sm.sessionFactory(sessionctx, id, pm, srs, provSearchDelay, rebroadcastDelay)
77
	tracked := sesTrk{session, pm, srs}
78
	sm.sessLk.Lock()
79
	sm.sessions = append(sm.sessions, tracked)
80
	sm.sessLk.Unlock()
81
	go func() {
hannahhoward's avatar
hannahhoward committed
82 83 84
		defer cancel()
		select {
		case <-sm.ctx.Done():
85
			sm.removeSession(tracked)
hannahhoward's avatar
hannahhoward committed
86
		case <-ctx.Done():
87
			sm.removeSession(tracked)
88 89 90 91
		}
	}()

	return session
92 93
}

94
func (sm *SessionManager) removeSession(session sesTrk) {
95 96 97 98 99 100 101 102 103 104 105
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
	for i := 0; i < len(sm.sessions); i++ {
		if sm.sessions[i] == session {
			sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
			sm.sessions = sm.sessions[:len(sm.sessions)-1]
			return
		}
	}
}

106
// GetNextSessionID returns the next sequentional identifier for a session.
107 108 109 110 111 112 113
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
	sm.sessID++
	return sm.sessID
}

114
// ReceiveBlocksFrom receives blocks from a peer and dispatches to interested
115
// sessions.
116
func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, blks []blocks.Block) {
117 118 119
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

120
	// Only give each session the blocks / dups that it is interested in
121
	for _, s := range sm.sessions {
122 123 124 125 126
		sessBlks := make([]blocks.Block, 0, len(blks))
		for _, b := range blks {
			if s.session.InterestedIn(b.Cid()) {
				sessBlks = append(sessBlks, b)
			}
127
		}
128
		s.session.ReceiveBlocksFrom(from, sessBlks)
129 130
	}
}