sessionmanager.go 2.37 KB
Newer Older
1 2 3
package sessionmanager

import (
4
	"context"
5 6
	"sync"

7 8 9 10 11 12
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"

	bsnet "github.com/ipfs/go-bitswap/network"
	bssession "github.com/ipfs/go-bitswap/session"
	bswm "github.com/ipfs/go-bitswap/wantmanager"
13
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
14
	peer "github.com/libp2p/go-libp2p-peer"
15 16
)

17 18
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
19
type SessionManager struct {
20 21 22
	wm      *bswm.WantManager
	network bsnet.BitSwapNetwork
	ctx     context.Context
23 24
	// Sessions
	sessLk   sync.Mutex
25
	sessions []*bssession.Session
26 27 28 29 30 31

	// Session Index
	sessIDLk sync.Mutex
	sessID   uint64
}

32 33 34 35 36 37 38
// New creates a new SessionManager.
func New(ctx context.Context, wm *bswm.WantManager, network bsnet.BitSwapNetwork) *SessionManager {
	return &SessionManager{
		ctx:     ctx,
		wm:      wm,
		network: network,
	}
39 40
}

41 42 43 44 45 46 47
// NewSession initializes a session with the given context, and adds to the
// session manager.
func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
	id := sm.GetNextSessionID()
	sessionctx, cancel := context.WithCancel(ctx)

	session := bssession.New(sessionctx, id, sm.wm, sm.network)
48 49 50
	sm.sessLk.Lock()
	sm.sessions = append(sm.sessions, session)
	sm.sessLk.Unlock()
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	go func() {
		for {
			defer cancel()
			select {
			case <-sm.ctx.Done():
				sm.removeSession(session)
				return
			case <-ctx.Done():
				sm.removeSession(session)
				return
			}
		}
	}()

	return session
66 67
}

68
func (sm *SessionManager) removeSession(session exchange.Fetcher) {
69 70 71 72 73 74 75 76 77 78 79
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()
	for i := 0; i < len(sm.sessions); i++ {
		if sm.sessions[i] == session {
			sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
			sm.sessions = sm.sessions[:len(sm.sessions)-1]
			return
		}
	}
}

80
// GetNextSessionID returns the next sequentional identifier for a session.
81 82 83 84 85 86 87
func (sm *SessionManager) GetNextSessionID() uint64 {
	sm.sessIDLk.Lock()
	defer sm.sessIDLk.Unlock()
	sm.sessID++
	return sm.sessID
}

88 89 90
// ReceiveBlockFrom receives a block from a peer and dispatches to interested
// sessions.
func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
91 92 93
	sm.sessLk.Lock()
	defer sm.sessLk.Unlock()

94 95
	k := blk.Cid()
	ks := []cid.Cid{k}
96
	for _, s := range sm.sessions {
97 98 99 100
		if s.InterestedIn(k) {
			s.ReceiveBlockFrom(from, blk)
			sm.wm.CancelWants(sm.ctx, ks, nil, s.ID())
		}
101 102
	}
}