sessionmanager.go 3.22 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5 6
	"sync"

7 8 9 10
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"

	bssession "github.com/ipfs/go-bitswap/session"
11
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
12
	peer "github.com/libp2p/go-libp2p-peer"
13 14
)

15 16 17 18 19
// Session is a session that is managed by the session manager
type Session interface {
	exchange.Fetcher
	InterestedIn(cid.Cid) bool
	ReceiveBlockFrom(peer.ID, blocks.Block)
20
	UpdateReceiveCounters(blocks.Block)
21 22
}

23
type sesTrk struct {
24 25
	session Session
	pm      bssession.PeerManager
26 27
}

28 29 30 31 32 33
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager) Session

// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager

34 35
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
36
type SessionManager struct {
37 38 39
	ctx                context.Context
	sessionFactory     SessionFactory
	peerManagerFactory PeerManagerFactory
40 41
	// Sessions
	sessLk   sync.Mutex
42
	sessions []sesTrk
43 44 45 46 47 48

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
}

49
// New creates a new SessionManager.
50
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory) *SessionManager {
51
	return &SessionManager{
52 53 54
		ctx:                ctx,
		sessionFactory:     sessionFactory,
		peerManagerFactory: peerManagerFactory,
55
	}
56 57
}

58 59 60 61 62 63
// NewSession initializes a session with the given context, and adds to the
// session manager.
func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
	id := sm.GetNextSessionID()
	sessionctx, cancel := context.WithCancel(ctx)

64 65
	pm := sm.peerManagerFactory(sessionctx, id)
	session := sm.sessionFactory(sessionctx, id, pm)
66
	tracked := sesTrk{session, pm}
67
	sm.sessLk.Lock()
68
	sm.sessions = append(sm.sessions, tracked)
69
	sm.sessLk.Unlock()
70
	go func() {
hannahhoward's avatar
hannahhoward committed
71 72 73
		defer cancel()
		select {
		case <-sm.ctx.Done():
74
			sm.removeSession(tracked)
hannahhoward's avatar
hannahhoward committed
75
		case <-ctx.Done():
76
			sm.removeSession(tracked)
77 78 79 80
		}
	}()

	return session
81 82
}

83
func (sm *SessionManager) removeSession(session sesTrk) {
84 85 86 87 88 89 90 91 92 93 94
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
	for i := 0; i < len(sm.sessions); i++ {
		if sm.sessions[i] == session {
			sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
			sm.sessions = sm.sessions[:len(sm.sessions)-1]
			return
		}
	}
}

95
// GetNextSessionID returns the next sequentional identifier for a session.
96 97 98 99 100 101 102
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
	sm.sessID++
	return sm.sessID
}

103 104 105
// ReceiveBlockFrom receives a block from a peer and dispatches to interested
// sessions.
func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
106 107 108
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

109
	k := blk.Cid()
110
	for _, s := range sm.sessions {
111 112
		if s.session.InterestedIn(k) {
			s.session.ReceiveBlockFrom(from, blk)
113
		}
114 115
	}
}
116 117 118 119 120 121 122 123 124 125 126

// UpdateReceiveCounters records the fact that a block was received, allowing
// sessions to track duplicates
func (sm *SessionManager) UpdateReceiveCounters(blk blocks.Block) {
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

	for _, s := range sm.sessions {
		s.session.UpdateReceiveCounters(blk)
	}
}