sessionmanager.go 2.45 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5 6
	"sync"

7 8 9 10
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"

	bssession "github.com/ipfs/go-bitswap/session"
11
	bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
12
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
13
	peer "github.com/libp2p/go-libp2p-peer"
14 15
)

16 17 18 19 20
type sesTrk struct {
	session *bssession.Session
	pm      *bsspm.SessionPeerManager
}

21 22
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
23
type SessionManager struct {
24 25
	wm      bssession.WantManager
	network bsspm.PeerNetwork
26
	ctx     context.Context
27 28
	// Sessions
	sessLk   sync.Mutex
29
	sessions []sesTrk
30 31 32 33 34 35

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
}

36
// New creates a new SessionManager.
37
func New(ctx context.Context, wm bssession.WantManager, network bsspm.PeerNetwork) *SessionManager {
38 39 40 41 42
	return &SessionManager{
		ctx:     ctx,
		wm:      wm,
		network: network,
	}
43 44
}

45 46 47 48 49 50
// NewSession initializes a session with the given context, and adds to the
// session manager.
func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
	id := sm.GetNextSessionID()
	sessionctx, cancel := context.WithCancel(ctx)

51 52 53
	pm := bsspm.New(sessionctx, id, sm.network)
	session := bssession.New(sessionctx, id, sm.wm, pm)
	tracked := sesTrk{session, pm}
54
	sm.sessLk.Lock()
55
	sm.sessions = append(sm.sessions, tracked)
56
	sm.sessLk.Unlock()
57
	go func() {
hannahhoward's avatar
hannahhoward committed
58 59 60
		defer cancel()
		select {
		case <-sm.ctx.Done():
61
			sm.removeSession(tracked)
hannahhoward's avatar
hannahhoward committed
62
		case <-ctx.Done():
63
			sm.removeSession(tracked)
64 65 66 67
		}
	}()

	return session
68 69
}

70
func (sm *SessionManager) removeSession(session sesTrk) {
71 72 73 74 75 76 77 78 79 80 81
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
	for i := 0; i < len(sm.sessions); i++ {
		if sm.sessions[i] == session {
			sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
			sm.sessions = sm.sessions[:len(sm.sessions)-1]
			return
		}
	}
}

82
// GetNextSessionID returns the next sequentional identifier for a session.
83 84 85 86 87 88 89
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
	sm.sessID++
	return sm.sessID
}

90 91 92
// ReceiveBlockFrom receives a block from a peer and dispatches to interested
// sessions.
func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
93 94 95
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

96 97
	k := blk.Cid()
	ks := []cid.Cid{k}
98
	for _, s := range sm.sessions {
99 100 101
		if s.session.InterestedIn(k) {
			s.session.ReceiveBlockFrom(from, blk)
			sm.wm.CancelWants(sm.ctx, ks, nil, s.session.ID())
102
		}
103 104
	}
}