Commit 40aa1fb8 authored by hannahhoward's avatar hannahhoward

refactor(sessions): extract sessions to package

- moved sessions out of main bitswap package
- modified session manager to manage all sessions
- moved get functions to their own package so sessions can directly

BREAKING CHANGE: SessionsForBlock, while not used outside of Bitswap, has been removed, and was an
exported function
parent c9b0a134
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"time" "time"
decision "github.com/ipfs/go-bitswap/decision" decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue" bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network" bsnet "github.com/ipfs/go-bitswap/network"
...@@ -100,6 +101,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -100,6 +101,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(p, network) return bsmq.New(p, network)
} }
wm := bswm.New(ctx)
bs := &Bitswap{ bs := &Bitswap{
blockstore: bstore, blockstore: bstore,
notifications: notif, notifications: notif,
...@@ -109,9 +111,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -109,9 +111,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px, process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize), newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: bswm.New(ctx), wm: wm,
pm: bspm.New(ctx, peerQueueFactory), pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(), sm: bssm.New(ctx, wm, network),
counters: new(counters), counters: new(counters),
dupMetric: dupHist, dupMetric: dupHist,
allMetric: allHist, allMetric: allHist,
...@@ -202,7 +204,7 @@ type blockRequest struct { ...@@ -202,7 +204,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context. // deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, bs.GetBlocks) return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
} }
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid { func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
...@@ -307,7 +309,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks ...@@ -307,7 +309,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return out, nil return out, nil
} }
// CancelWant removes a given key from the wantlist. // CancelWants removes a given key from the wantlist.
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) { func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 { if len(cids) == 0 {
return return
...@@ -345,12 +347,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { ...@@ -345,12 +347,7 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// it now as it requires more thought and isnt causing immediate problems. // it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
k := blk.Cid() bs.sm.ReceiveBlockFrom(from, blk)
ks := []cid.Cid{k}
for _, s := range bs.SessionsForBlock(k) {
s.receiveBlockFrom(from, blk)
bs.CancelWants(ks, s.id)
}
bs.engine.AddBlock(blk) bs.engine.AddBlock(blk)
...@@ -363,18 +360,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { ...@@ -363,18 +360,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
return nil return nil
} }
// SessionsForBlock returns a slice of all sessions that may be interested in the given cid.
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
var out []*Session
bs.sm.IterateSessions(func(session exchange.Fetcher) {
s := session.(*Session)
if s.interestedIn(c) {
out = append(out, s)
}
})
return out
}
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
atomic.AddUint64(&bs.counters.messagesRecvd, 1) atomic.AddUint64(&bs.counters.messagesRecvd, 1)
...@@ -477,3 +462,7 @@ func (bs *Bitswap) GetWantlist() []cid.Cid { ...@@ -477,3 +462,7 @@ func (bs *Bitswap) GetWantlist() []cid.Cid {
func (bs *Bitswap) IsOnline() bool { func (bs *Bitswap) IsOnline() bool {
return true return true
} }
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
bssession "github.com/ipfs/go-bitswap/session"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil" blocksutil "github.com/ipfs/go-ipfs-blocksutil"
...@@ -132,8 +133,8 @@ func TestSessionSplitFetch(t *testing.T) { ...@@ -132,8 +133,8 @@ func TestSessionSplitFetch(t *testing.T) {
cids = append(cids, blk.Cid()) cids = append(cids, blk.Cid())
} }
ses := inst[10].Exchange.NewSession(ctx).(*Session) ses := inst[10].Exchange.NewSession(ctx).(*bssession.Session)
ses.baseTickDelay = time.Millisecond * 10 ses.SetBaseTickDelay(time.Millisecond * 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10]) ch, err := ses.GetBlocks(ctx, cids[i*10:(i+1)*10])
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
tn "github.com/ipfs/go-bitswap/testnet" tn "github.com/ipfs/go-bitswap/testnet"
bssession "github.com/ipfs/go-bitswap/session"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil" blocksutil "github.com/ipfs/go-ipfs-blocksutil"
...@@ -248,14 +249,14 @@ func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) { ...@@ -248,14 +249,14 @@ func onePeerPerBlock(b *testing.B, provs []Instance, blks []blocks.Block) {
} }
func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) { func oneAtATime(b *testing.B, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background()).(*Session) ses := bs.NewSession(context.Background()).(*bssession.Session)
for _, c := range ks { for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c) _, err := ses.GetBlock(context.Background(), c)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
b.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt)) b.Logf("Session fetch latency: %s", ses.GetAverageLatency())
} }
// fetch data in batches, 10 at a time // fetch data in batches, 10 at a time
......
package bitswap package getter
import ( import (
"context" "context"
"errors" "errors"
notifications "github.com/ipfs/go-bitswap/notifications" notifications "github.com/ipfs/go-bitswap/notifications"
logging "github.com/ipfs/go-log"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
) )
type getBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error) var log = logging.Logger("bitswap")
func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, error) { // GetBlocksFunc is any function that can take an array of CIDs and return a
// channel of incoming blocks.
type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)
// SyncGetBlock takes a block cid and an async function for getting several
// blocks that returns a channel, and uses that function to return the
// block syncronously.
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
if !k.Defined() { if !k.Defined() {
log.Error("undefined cid in GetBlock") log.Error("undefined cid in GetBlock")
return nil, blockstore.ErrNotFound return nil, blockstore.ErrNotFound
...@@ -49,9 +57,13 @@ func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, err ...@@ -49,9 +57,13 @@ func getBlock(p context.Context, k cid.Cid, gb getBlocksFunc) (blocks.Block, err
} }
} }
type wantFunc func(context.Context, []cid.Cid) // WantFunc is any function that can express a want for set of blocks.
type WantFunc func(context.Context, []cid.Cid)
func getBlocksImpl(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want wantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) { // AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
// blocks, a want function, and a close function,
// and returns a channel of incoming blocks.
func AsyncGetBlocks(ctx context.Context, keys []cid.Cid, notif notifications.PubSub, want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
if len(keys) == 0 { if len(keys) == 0 {
out := make(chan blocks.Block) out := make(chan blocks.Block)
close(out) close(out)
......
package bitswap package session
import ( import (
"context" "context"
"fmt" "fmt"
"time" "time"
notifications "github.com/ipfs/go-bitswap/notifications"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables" loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
...@@ -18,41 +18,61 @@ import ( ...@@ -18,41 +18,61 @@ import (
const activeWantsLimit = 16 const activeWantsLimit = 16
// SessionWantmanager is an interface that can be used to request blocks
// from given peers.
type SessionWantManager interface {
WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}
type interestReq struct {
c cid.Cid
resp chan bool
}
type blkRecv struct {
from peer.ID
blk blocks.Block
}
// Session holds state for an individual bitswap transfer operation. // Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist // This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from. // info to, and who to request blocks from.
type Session struct { type Session struct {
ctx context.Context // dependencies
ctx context.Context
wm SessionWantManager
network bsnet.BitSwapNetwork
// channels
incoming chan blkRecv
newReqs chan []cid.Cid
cancelKeys chan []cid.Cid
interestReqs chan interestReq
latencyReqs chan chan time.Duration
tickDelayReqs chan time.Duration
// do not touch outside run loop
tofetch *cidQueue tofetch *cidQueue
activePeers map[peer.ID]struct{} activePeers map[peer.ID]struct{}
activePeersArr []peer.ID activePeersArr []peer.ID
interest *lru.Cache
bs *Bitswap liveWants map[cid.Cid]time.Time
incoming chan blkRecv tick *time.Timer
newReqs chan []cid.Cid baseTickDelay time.Duration
cancelKeys chan []cid.Cid latTotal time.Duration
interestReqs chan interestReq fetchcnt int
interest *lru.Cache // identifiers
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
notif notifications.PubSub notif notifications.PubSub
uuid logging.Loggable
uuid logging.Loggable id uint64
tag string
id uint64
tag string
} }
// NewSession creates a new bitswap session whose lifetime is bounded by the // New creates a new bitswap session whose lifetime is bounded by the
// given context. // given context.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.BitSwapNetwork) *Session {
s := &Session{ s := &Session{
activePeers: make(map[peer.ID]struct{}), activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[cid.Cid]time.Time), liveWants: make(map[cid.Cid]time.Time),
...@@ -60,13 +80,16 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { ...@@ -60,13 +80,16 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
cancelKeys: make(chan []cid.Cid), cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(), tofetch: newCidQueue(),
interestReqs: make(chan interestReq), interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx, ctx: ctx,
bs: bs, wm: wm,
network: network,
incoming: make(chan blkRecv), incoming: make(chan blkRecv),
notif: notifications.New(), notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"), uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500, baseTickDelay: time.Millisecond * 500,
id: bs.sm.GetNextSessionID(), id: id,
} }
s.tag = fmt.Sprint("bs-ses-", s.id) s.tag = fmt.Sprint("bs-ses-", s.id)
...@@ -74,39 +97,63 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { ...@@ -74,39 +97,63 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
cache, _ := lru.New(2048) cache, _ := lru.New(2048)
s.interest = cache s.interest = cache
bs.sm.AddSession(s)
go s.run(ctx) go s.run(ctx)
return s return s
} }
func (bs *Bitswap) removeSession(s *Session) { // ReceiveBlockFrom receives an incoming block from the given peer.
s.notif.Shutdown() func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
select {
live := make([]cid.Cid, 0, len(s.liveWants)) case s.incoming <- blkRecv{from: from, blk: blk}:
for c := range s.liveWants { case <-s.ctx.Done():
live = append(live, c)
} }
bs.CancelWants(live, s.id) }
bs.sm.RemoveSession(s) // InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
return s.interest.Contains(c) || s.isLiveWant(c)
} }
type blkRecv struct { // GetBlock fetches a single block.
from peer.ID func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
blk blocks.Block return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return bsgetter.AsyncGetBlocks(ctx, keys, s.notif, s.fetch, s.cancel)
}
// ID returns the sessions identifier.
func (s *Session) ID() uint64 {
return s.id
} }
func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) { func (s *Session) GetAverageLatency() time.Duration {
resp := make(chan time.Duration)
select { select {
case s.incoming <- blkRecv{from: from, blk: blk}: case s.latencyReqs <- resp:
case <-s.ctx.Done():
return -1 * time.Millisecond
}
select {
case latency := <-resp:
return latency
case <-s.ctx.Done(): case <-s.ctx.Done():
return -1 * time.Millisecond
} }
} }
type interestReq struct { func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
c cid.Cid select {
resp chan bool case s.tickDelayReqs <- baseTickDelay:
case <-s.ctx.Done():
}
} }
// TODO: PERF: this is using a channel to guard a map access against race // TODO: PERF: this is using a channel to guard a map access against race
...@@ -135,114 +182,147 @@ func (s *Session) isLiveWant(c cid.Cid) bool { ...@@ -135,114 +182,147 @@ func (s *Session) isLiveWant(c cid.Cid) bool {
} }
} }
func (s *Session) interestedIn(c cid.Cid) bool { func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
return s.interest.Contains(c) || s.isLiveWant(c) select {
} case s.newReqs <- keys:
case <-ctx.Done():
const provSearchDelay = time.Second * 10 case <-s.ctx.Done():
func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p)
cmgr := s.bs.network.ConnectionManager()
cmgr.TagPeer(p, s.tag, 10)
} }
} }
func (s *Session) resetTick() { func (s *Session) cancel(keys []cid.Cid) {
if s.latTotal == 0 { select {
s.tick.Reset(provSearchDelay) case s.cancelKeys <- keys:
} else { case <-s.ctx.Done():
avLat := s.latTotal / time.Duration(s.fetchcnt)
s.tick.Reset(s.baseTickDelay + (3 * avLat))
} }
} }
const provSearchDelay = time.Second * 10
// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) { func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay) s.tick = time.NewTimer(provSearchDelay)
newpeers := make(chan peer.ID, 16) newpeers := make(chan peer.ID, 16)
for { for {
select { select {
case blk := <-s.incoming: case blk := <-s.incoming:
s.tick.Stop() s.handleIncomingBlock(ctx, blk)
if blk.from != "" {
s.addActivePeer(blk.from)
}
s.receiveBlock(ctx, blk.blk)
s.resetTick()
case keys := <-s.newReqs: case keys := <-s.newReqs:
for _, k := range keys { s.handleNewRequest(ctx, keys)
s.interest.Add(k, nil)
}
if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants)
if toadd > len(keys) {
toadd = len(keys)
}
now := keys[:toadd]
keys = keys[toadd:]
s.wantBlocks(ctx, now)
}
for _, k := range keys {
s.tofetch.Push(k)
}
case keys := <-s.cancelKeys: case keys := <-s.cancelKeys:
s.cancel(keys) s.handleCancel(keys)
case <-s.tick.C: case <-s.tick.C:
live := make([]cid.Cid, 0, len(s.liveWants)) s.handleTick(ctx, newpeers)
now := time.Now()
for c := range s.liveWants {
live = append(live, c)
s.liveWants[c] = now
}
// Broadcast these keys to everyone we're connected to
s.bs.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range s.bs.network.FindProvidersAsync(ctx, k, 10) {
newpeers <- p
}
}(live[0])
}
s.resetTick()
case p := <-newpeers: case p := <-newpeers:
s.addActivePeer(p) s.addActivePeer(p)
case lwchk := <-s.interestReqs: case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c) lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
resp <- s.averageLatency()
case baseTickDelay := <-s.tickDelayReqs:
s.baseTickDelay = baseTickDelay
case <-ctx.Done(): case <-ctx.Done():
s.tick.Stop() s.handleShutdown()
s.bs.removeSession(s)
cmgr := s.bs.network.ConnectionManager()
for _, p := range s.activePeersArr {
cmgr.UntagPeer(p, s.tag)
}
return return
} }
} }
} }
func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
s.tick.Stop()
if blk.from != "" {
s.addActivePeer(blk.from)
}
s.receiveBlock(ctx, blk.blk)
s.resetTick()
}
func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
for _, k := range keys {
s.interest.Add(k, nil)
}
if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants)
if toadd > len(keys) {
toadd = len(keys)
}
now := keys[:toadd]
keys = keys[toadd:]
s.wantBlocks(ctx, now)
}
for _, k := range keys {
s.tofetch.Push(k)
}
}
func (s *Session) handleCancel(keys []cid.Cid) {
for _, c := range keys {
s.tofetch.Remove(c)
}
}
func (s *Session) handleTick(ctx context.Context, newpeers chan<- peer.ID) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
live = append(live, c)
s.liveWants[c] = now
}
// Broadcast these keys to everyone we're connected to
s.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range s.network.FindProvidersAsync(ctx, k, 10) {
newpeers <- p
}
}(live[0])
}
s.resetTick()
}
func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p)
cmgr := s.network.ConnectionManager()
cmgr.TagPeer(p, s.tag, 10)
}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.notif.Shutdown()
live := make([]cid.Cid, 0, len(s.liveWants))
for c := range s.liveWants {
live = append(live, c)
}
s.wm.CancelWants(s.ctx, live, nil, s.id)
cmgr := s.network.ConnectionManager()
for _, p := range s.activePeersArr {
cmgr.UntagPeer(p, s.tag)
}
}
func (s *Session) cidIsWanted(c cid.Cid) bool { func (s *Session) cidIsWanted(c cid.Cid) bool {
_, ok := s.liveWants[c] _, ok := s.liveWants[c]
if !ok { if !ok {
ok = s.tofetch.Has(c) ok = s.tofetch.Has(c)
} }
return ok return ok
} }
...@@ -270,43 +350,21 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { ...@@ -270,43 +350,21 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
for _, c := range ks { for _, c := range ks {
s.liveWants[c] = now s.liveWants[c] = now
} }
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) s.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
} }
func (s *Session) cancel(keys []cid.Cid) { func (s *Session) averageLatency() time.Duration {
for _, c := range keys { return s.latTotal / time.Duration(s.fetchcnt)
s.tofetch.Remove(c)
}
}
func (s *Session) cancelWants(keys []cid.Cid) {
select {
case s.cancelKeys <- keys:
case <-s.ctx.Done():
}
} }
func (s *Session) resetTick() {
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) { if s.latTotal == 0 {
select { s.tick.Reset(provSearchDelay)
case s.newReqs <- keys: } else {
case <-ctx.Done(): avLat := s.averageLatency()
case <-s.ctx.Done(): s.tick.Reset(s.baseTickDelay + (3 * avLat))
} }
} }
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
}
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, s.GetBlocks)
}
type cidQueue struct { type cidQueue struct {
elems []cid.Cid elems []cid.Cid
eset *cid.Set eset *cid.Set
......
package sessionmanager package sessionmanager
import ( import (
"context"
"sync" "sync"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bsnet "github.com/ipfs/go-bitswap/network"
bssession "github.com/ipfs/go-bitswap/session"
bswm "github.com/ipfs/go-bitswap/wantmanager"
exchange "github.com/ipfs/go-ipfs-exchange-interface" exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-peer"
) )
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
type SessionManager struct { type SessionManager struct {
wm *bswm.WantManager
network bsnet.BitSwapNetwork
ctx context.Context
// Sessions // Sessions
sessLk sync.Mutex sessLk sync.Mutex
sessions []exchange.Fetcher sessions []*bssession.Session
// Session Index // Session Index
sessIDLk sync.Mutex sessIDLk sync.Mutex
sessID uint64 sessID uint64
} }
func New() *SessionManager { // New creates a new SessionManager.
return &SessionManager{} func New(ctx context.Context, wm *bswm.WantManager, network bsnet.BitSwapNetwork) *SessionManager {
return &SessionManager{
ctx: ctx,
wm: wm,
network: network,
}
} }
func (sm *SessionManager) AddSession(session exchange.Fetcher) { // NewSession initializes a session with the given context, and adds to the
// session manager.
func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
id := sm.GetNextSessionID()
sessionctx, cancel := context.WithCancel(ctx)
session := bssession.New(sessionctx, id, sm.wm, sm.network)
sm.sessLk.Lock() sm.sessLk.Lock()
sm.sessions = append(sm.sessions, session) sm.sessions = append(sm.sessions, session)
sm.sessLk.Unlock() sm.sessLk.Unlock()
go func() {
for {
defer cancel()
select {
case <-sm.ctx.Done():
sm.removeSession(session)
return
case <-ctx.Done():
sm.removeSession(session)
return
}
}
}()
return session
} }
func (sm *SessionManager) RemoveSession(session exchange.Fetcher) { func (sm *SessionManager) removeSession(session exchange.Fetcher) {
sm.sessLk.Lock() sm.sessLk.Lock()
defer sm.sessLk.Unlock() defer sm.sessLk.Unlock()
for i := 0; i < len(sm.sessions); i++ { for i := 0; i < len(sm.sessions); i++ {
...@@ -38,6 +77,7 @@ func (sm *SessionManager) RemoveSession(session exchange.Fetcher) { ...@@ -38,6 +77,7 @@ func (sm *SessionManager) RemoveSession(session exchange.Fetcher) {
} }
} }
// GetNextSessionID returns the next sequentional identifier for a session.
func (sm *SessionManager) GetNextSessionID() uint64 { func (sm *SessionManager) GetNextSessionID() uint64 {
sm.sessIDLk.Lock() sm.sessIDLk.Lock()
defer sm.sessIDLk.Unlock() defer sm.sessIDLk.Unlock()
...@@ -45,15 +85,18 @@ func (sm *SessionManager) GetNextSessionID() uint64 { ...@@ -45,15 +85,18 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
return sm.sessID return sm.sessID
} }
type IterateSessionFunc func(session exchange.Fetcher) // ReceiveBlockFrom receives a block from a peer and dispatches to interested
// sessions.
// IterateSessions loops through all managed sessions and applies the given func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
// IterateSessionFunc.
func (sm *SessionManager) IterateSessions(iterate IterateSessionFunc) {
sm.sessLk.Lock() sm.sessLk.Lock()
defer sm.sessLk.Unlock() defer sm.sessLk.Unlock()
k := blk.Cid()
ks := []cid.Cid{k}
for _, s := range sm.sessions { for _, s := range sm.sessions {
iterate(s) if s.InterestedIn(k) {
s.ReceiveBlockFrom(from, blk)
sm.wm.CancelWants(sm.ctx, ks, nil, s.ID())
}
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment