Commit 69d063bf authored by hannahhoward's avatar hannahhoward

refactor(general): extract components to packages

Extract session manager from bitswap
Extract session manager & want manager to package
Move want manager message queue to seperate file
Move Message Queue to subpackage
Respond to PR Comments
parent c5b071da
...@@ -5,7 +5,6 @@ package bitswap ...@@ -5,7 +5,6 @@ package bitswap
import ( import (
"context" "context"
"errors" "errors"
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -14,6 +13,8 @@ import ( ...@@ -14,6 +13,8 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network" bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications" notifications "github.com/ipfs/go-bitswap/notifications"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
...@@ -42,8 +43,6 @@ const ( ...@@ -42,8 +43,6 @@ const (
providerRequestTimeout = time.Second * 10 providerRequestTimeout = time.Second * 10
provideTimeout = time.Second * 15 provideTimeout = time.Second * 15
sizeBatchRequestChan = 32 sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
) )
var ( var (
...@@ -101,7 +100,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -101,7 +100,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px, process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize), newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize), provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network), wm: bswm.New(ctx, network),
sm: bssm.New(),
counters: new(counters), counters: new(counters),
dupMetric: dupHist, dupMetric: dupHist,
...@@ -128,7 +128,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, ...@@ -128,7 +128,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
type Bitswap struct { type Bitswap struct {
// the peermanager manages sending messages to peers in a way that // the peermanager manages sending messages to peers in a way that
// wont block bitswap operation // wont block bitswap operation
wm *WantManager wm *bswm.WantManager
// the engine is the bit of logic that decides who to send which blocks to // the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine engine *decision.Engine
...@@ -163,12 +163,8 @@ type Bitswap struct { ...@@ -163,12 +163,8 @@ type Bitswap struct {
dupMetric metrics.Histogram dupMetric metrics.Histogram
allMetric metrics.Histogram allMetric metrics.Histogram
// Sessions // the sessionmanager manages tracking sessions
sessions []*Session sm *bssm.SessionManager
sessLk sync.Mutex
sessID uint64
sessIDLk sync.Mutex
} }
type counters struct { type counters struct {
...@@ -229,7 +225,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks ...@@ -229,7 +225,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
} }
mses := bs.getNextSessionID() mses := bs.sm.GetNextSessionID()
bs.wm.WantBlocks(ctx, keys, nil, mses) bs.wm.WantBlocks(ctx, keys, nil, mses)
...@@ -294,13 +290,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks ...@@ -294,13 +290,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return out, nil return out, nil
} }
func (bs *Bitswap) getNextSessionID() uint64 {
bs.sessIDLk.Lock()
defer bs.sessIDLk.Unlock()
bs.sessID++
return bs.sessID
}
// CancelWant removes a given key from the wantlist // CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) { func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 { if len(cids) == 0 {
...@@ -359,15 +348,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error { ...@@ -359,15 +348,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// SessionsForBlock returns a slice of all sessions that may be interested in the given cid // SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session { func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()
var out []*Session var out []*Session
for _, s := range bs.sessions { bs.sm.IterateSessions(func(session exchange.Fetcher) {
s := session.(*Session)
if s.interestedIn(c) { if s.interestedIn(c) {
out = append(out, s) out = append(out, s)
} }
} })
return out return out
} }
...@@ -398,7 +385,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -398,7 +385,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Debugf("got block %s from %s", b, p) log.Debugf("got block %s from %s", b, p)
// skip received blocks that are not in the wantlist // skip received blocks that are not in the wantlist
if _, contains := bs.wm.wl.Contains(b.Cid()); !contains { if !bs.wm.IsWanted(b.Cid()) {
return return
} }
...@@ -461,7 +448,7 @@ func (bs *Bitswap) Close() error { ...@@ -461,7 +448,7 @@ func (bs *Bitswap) Close() error {
} }
func (bs *Bitswap) GetWantlist() []cid.Cid { func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.wl.Entries() entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries)) out := make([]cid.Cid, 0, len(entries))
for _, e := range entries { for _, e := range entries {
out = append(out, e.Cid) out = append(out, e.Cid)
......
package messagequeue
import (
"context"
"sync"
"time"
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("bitswap")
type MessageQueue struct {
p peer.ID
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
wl *wantlist.ThreadSafe
sender bsnet.MessageSender
refcnt int
work chan struct{}
done chan struct{}
}
func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 1,
}
}
func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}
func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
var work bool
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
if !work {
return
}
select {
case mq.work <- struct{}{}:
default:
}
}()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}
// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}
}
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}
go mq.runQueue(ctx)
}
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}
func (mq *MessageQueue) runQueue(ctx context.Context) {
for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
return
}
}
}
func (mq *MessageQueue) doWork(ctx context.Context) {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
return
}
mq.out = nil
mq.outlk.Unlock()
// NB: only open a stream if we actually have data to send
if mq.sender == nil {
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
}
// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return
}
log.Infof("bitswap send error: %s", err)
mq.sender.Reset()
mq.sender = nil
select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}
err = mq.openSender(ctx)
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}
// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}
func (mq *MessageQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}
...@@ -66,7 +66,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { ...@@ -66,7 +66,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
notif: notifications.New(), notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"), uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500, baseTickDelay: time.Millisecond * 500,
id: bs.getNextSessionID(), id: bs.sm.GetNextSessionID(),
} }
s.tag = fmt.Sprint("bs-ses-", s.id) s.tag = fmt.Sprint("bs-ses-", s.id)
...@@ -74,10 +74,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { ...@@ -74,10 +74,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
cache, _ := lru.New(2048) cache, _ := lru.New(2048)
s.interest = cache s.interest = cache
bs.sessLk.Lock() bs.sm.AddSession(s)
bs.sessions = append(bs.sessions, s)
bs.sessLk.Unlock()
go s.run(ctx) go s.run(ctx)
return s return s
...@@ -92,15 +89,7 @@ func (bs *Bitswap) removeSession(s *Session) { ...@@ -92,15 +89,7 @@ func (bs *Bitswap) removeSession(s *Session) {
} }
bs.CancelWants(live, s.id) bs.CancelWants(live, s.id)
bs.sessLk.Lock() bs.sm.RemoveSession(s)
defer bs.sessLk.Unlock()
for i := 0; i < len(bs.sessions); i++ {
if bs.sessions[i] == s {
bs.sessions[i] = bs.sessions[len(bs.sessions)-1]
bs.sessions = bs.sessions[:len(bs.sessions)-1]
return
}
}
} }
type blkRecv struct { type blkRecv struct {
......
package sessionmanager
import (
"sync"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
)
type SessionManager struct {
// Sessions
sessLk sync.Mutex
sessions []exchange.Fetcher
// Session Index
sessIDLk sync.Mutex
sessID uint64
}
func New() *SessionManager {
return &SessionManager{}
}
func (sm *SessionManager) AddSession(session exchange.Fetcher) {
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, session)
sm.sessLk.Unlock()
}
func (sm *SessionManager) RemoveSession(session exchange.Fetcher) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for i := 0; i < len(sm.sessions); i++ {
if sm.sessions[i] == session {
sm.sessions[i] = sm.sessions[len(sm.sessions)-1]
sm.sessions = sm.sessions[:len(sm.sessions)-1]
return
}
}
}
func (sm *SessionManager) GetNextSessionID() uint64 {
sm.sessIDLk.Lock()
defer sm.sessIDLk.Unlock()
sm.sessID++
return sm.sessID
}
type IterateSessionFunc func(session exchange.Fetcher)
// IterateSessions loops through all managed sessions and applies the given
// IterateSessionFunc
func (sm *SessionManager) IterateSessions(iterate IterateSessionFunc) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
iterate(s)
}
}
package bitswap package wantmanager
import ( import (
"context" "context"
"sync" "math"
"time"
engine "github.com/ipfs/go-bitswap/decision" engine "github.com/ipfs/go-bitswap/decision"
bsmsg "github.com/ipfs/go-bitswap/message" bsmsg "github.com/ipfs/go-bitswap/message"
bsmq "github.com/ipfs/go-bitswap/messagequeue"
bsnet "github.com/ipfs/go-bitswap/network" bsnet "github.com/ipfs/go-bitswap/network"
wantlist "github.com/ipfs/go-bitswap/wantlist" wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
metrics "github.com/ipfs/go-metrics-interface" metrics "github.com/ipfs/go-metrics-interface"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
) )
var log = logging.Logger("bitswap")
const (
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
)
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
type WantManager struct { type WantManager struct {
// sync channels for Run loop // sync channels for Run loop
incoming chan *wantSet incoming chan *wantSet
...@@ -22,7 +34,7 @@ type WantManager struct { ...@@ -22,7 +34,7 @@ type WantManager struct {
peerReqs chan chan []peer.ID // channel to request connected peers on peerReqs chan chan []peer.ID // channel to request connected peers on
// synchronized by Run loop, only touch inside there // synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue peers map[peer.ID]*bsmq.MessageQueue
wl *wantlist.ThreadSafe wl *wantlist.ThreadSafe
bcwl *wantlist.ThreadSafe bcwl *wantlist.ThreadSafe
...@@ -39,7 +51,7 @@ type peerStatus struct { ...@@ -39,7 +51,7 @@ type peerStatus struct {
peer peer.ID peer peer.ID
} }
func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager { func New(ctx context.Context, network bsnet.BitSwapNetwork) *WantManager {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
wantlistGauge := metrics.NewCtx(ctx, "wantlist_total", wantlistGauge := metrics.NewCtx(ctx, "wantlist_total",
"Number of items in wantlist.").Gauge() "Number of items in wantlist.").Gauge()
...@@ -49,7 +61,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana ...@@ -49,7 +61,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
incoming: make(chan *wantSet, 10), incoming: make(chan *wantSet, 10),
connectEvent: make(chan peerStatus, 10), connectEvent: make(chan peerStatus, 10),
peerReqs: make(chan chan []peer.ID), peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue), peers: make(map[peer.ID]*bsmq.MessageQueue),
wl: wantlist.NewThreadSafe(), wl: wantlist.NewThreadSafe(),
bcwl: wantlist.NewThreadSafe(), bcwl: wantlist.NewThreadSafe(),
network: network, network: network,
...@@ -60,31 +72,15 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana ...@@ -60,31 +72,15 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
} }
} }
type msgQueue struct {
p peer.ID
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
wl *wantlist.ThreadSafe
sender bsnet.MessageSender
refcnt int
work chan struct{}
done chan struct{}
}
// WantBlocks adds the given cids to the wantlist, tracked by the given session // WantBlocks adds the given cids to the wantlist, tracked by the given session
func (pm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { func (wm *WantManager) WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
log.Infof("want blocks: %s", ks) log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, peers, false, ses) wm.addEntries(ctx, ks, peers, false, ses)
} }
// CancelWants removes the given cids from the wantlist, tracked by the given session // CancelWants removes the given cids from the wantlist, tracked by the given session
func (pm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) { func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64) {
pm.addEntries(context.Background(), ks, peers, true, ses) wm.addEntries(context.Background(), ks, peers, true, ses)
} }
type wantSet struct { type wantSet struct {
...@@ -93,7 +89,7 @@ type wantSet struct { ...@@ -93,7 +89,7 @@ type wantSet struct {
from uint64 from uint64
} }
func (pm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) { func (wm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
entries := make([]*bsmsg.Entry, 0, len(ks)) entries := make([]*bsmsg.Entry, 0, len(ks))
for i, k := range ks { for i, k := range ks {
entries = append(entries, &bsmsg.Entry{ entries = append(entries, &bsmsg.Entry{
...@@ -102,19 +98,19 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p ...@@ -102,19 +98,19 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []cid.Cid, targets []p
}) })
} }
select { select {
case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}: case wm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
case <-pm.ctx.Done(): case <-wm.ctx.Done():
case <-ctx.Done(): case <-ctx.Done():
} }
} }
func (pm *WantManager) ConnectedPeers() []peer.ID { func (wm *WantManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID) resp := make(chan []peer.ID)
pm.peerReqs <- resp wm.peerReqs <- resp
return <-resp return <-resp
} }
func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { func (wm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure // Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack // throughout the network stack
defer env.Sent() defer env.Sent()
...@@ -127,176 +123,62 @@ func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) { ...@@ -127,176 +123,62 @@ func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) {
log.Infof("Sending block %s to %s", block, env.Peer) log.Infof("Sending block %s to %s", block, env.Peer)
} }
pm.sentHistogram.Observe(float64(msgSize)) wm.sentHistogram.Observe(float64(msgSize))
err := pm.network.SendMessage(ctx, env.Peer, msg) err := wm.network.SendMessage(ctx, env.Peer, msg)
if err != nil { if err != nil {
log.Infof("sendblock error: %s", err) log.Infof("sendblock error: %s", err)
} }
} }
func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { func (wm *WantManager) startPeerHandler(p peer.ID) *bsmq.MessageQueue {
mq, ok := pm.peers[p] mq, ok := wm.peers[p]
if ok { if ok {
mq.refcnt++ mq.RefIncrement()
return nil return nil
} }
mq = pm.newMsgQueue(p) mq = bsmq.New(p, wm.network)
wm.peers[p] = mq
// new peer, we will want to give them our full wantlist mq.Startup(wm.ctx, wm.bcwl.Entries())
fullwantlist := bsmsg.New(true)
for _, e := range pm.bcwl.Entries() {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}
pm.peers[p] = mq
go mq.runQueue(pm.ctx)
return mq return mq
} }
func (pm *WantManager) stopPeerHandler(p peer.ID) { func (wm *WantManager) stopPeerHandler(p peer.ID) {
pq, ok := pm.peers[p] pq, ok := wm.peers[p]
if !ok { if !ok {
// TODO: log error? // TODO: log error?
return return
} }
pq.refcnt-- if pq.RefDecrement() {
if pq.refcnt > 0 {
return return
} }
close(pq.done) pq.Shutdown()
delete(pm.peers, p) delete(wm.peers, p)
} }
func (mq *msgQueue) runQueue(ctx context.Context) { func (wm *WantManager) Connected(p peer.ID) {
for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
return
}
}
}
func (mq *msgQueue) doWork(ctx context.Context) {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
return
}
mq.out = nil
mq.outlk.Unlock()
// NB: only open a stream if we actually have data to send
if mq.sender == nil {
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
}
// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return
}
log.Infof("bitswap send error: %s", err)
mq.sender.Reset()
mq.sender = nil
select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}
err = mq.openSender(ctx)
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}
// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}
func (mq *msgQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}
func (pm *WantManager) Connected(p peer.ID) {
select { select {
case pm.connectEvent <- peerStatus{peer: p, connect: true}: case wm.connectEvent <- peerStatus{peer: p, connect: true}:
case <-pm.ctx.Done(): case <-wm.ctx.Done():
} }
} }
func (pm *WantManager) Disconnected(p peer.ID) { func (wm *WantManager) Disconnected(p peer.ID) {
select { select {
case pm.connectEvent <- peerStatus{peer: p, connect: false}: case wm.connectEvent <- peerStatus{peer: p, connect: false}:
case <-pm.ctx.Done(): case <-wm.ctx.Done():
} }
} }
// TODO: use goprocess here once i trust it // TODO: use goprocess here once i trust it
func (pm *WantManager) Run() { func (wm *WantManager) Run() {
// NOTE: Do not open any streams or connections from anywhere in this // NOTE: Do not open any streams or connections from anywhere in this
// event loop. Really, just don't do anything likely to block. // event loop. Really, just don't do anything likely to block.
for { for {
select { select {
case ws := <-pm.incoming: case ws := <-wm.incoming:
// is this a broadcast or not? // is this a broadcast or not?
brdc := len(ws.targets) == 0 brdc := len(ws.targets) == 0
...@@ -305,100 +187,65 @@ func (pm *WantManager) Run() { ...@@ -305,100 +187,65 @@ func (pm *WantManager) Run() {
for _, e := range ws.entries { for _, e := range ws.entries {
if e.Cancel { if e.Cancel {
if brdc { if brdc {
pm.bcwl.Remove(e.Cid, ws.from) wm.bcwl.Remove(e.Cid, ws.from)
} }
if pm.wl.Remove(e.Cid, ws.from) { if wm.wl.Remove(e.Cid, ws.from) {
pm.wantlistGauge.Dec() wm.wantlistGauge.Dec()
} }
} else { } else {
if brdc { if brdc {
pm.bcwl.AddEntry(e.Entry, ws.from) wm.bcwl.AddEntry(e.Entry, ws.from)
} }
if pm.wl.AddEntry(e.Entry, ws.from) { if wm.wl.AddEntry(e.Entry, ws.from) {
pm.wantlistGauge.Inc() wm.wantlistGauge.Inc()
} }
} }
} }
// broadcast those wantlist changes // broadcast those wantlist changes
if len(ws.targets) == 0 { if len(ws.targets) == 0 {
for _, p := range pm.peers { for _, p := range wm.peers {
p.addMessage(ws.entries, ws.from) p.AddMessage(ws.entries, ws.from)
} }
} else { } else {
for _, t := range ws.targets { for _, t := range ws.targets {
p, ok := pm.peers[t] p, ok := wm.peers[t]
if !ok { if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t) log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue continue
} }
p.addMessage(ws.entries, ws.from) p.AddMessage(ws.entries, ws.from)
} }
} }
case p := <-pm.connectEvent: case p := <-wm.connectEvent:
if p.connect { if p.connect {
pm.startPeerHandler(p.peer) wm.startPeerHandler(p.peer)
} else { } else {
pm.stopPeerHandler(p.peer) wm.stopPeerHandler(p.peer)
} }
case req := <-pm.peerReqs: case req := <-wm.peerReqs:
peers := make([]peer.ID, 0, len(pm.peers)) peers := make([]peer.ID, 0, len(wm.peers))
for p := range pm.peers { for p := range wm.peers {
peers = append(peers, p) peers = append(peers, p)
} }
req <- peers req <- peers
case <-pm.ctx.Done(): case <-wm.ctx.Done():
return return
} }
} }
} }
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { func (wm *WantManager) IsWanted(c cid.Cid) bool {
return &msgQueue{ _, isWanted := wm.wl.Contains(c)
done: make(chan struct{}), return isWanted
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: wm.network,
p: p,
refcnt: 1,
}
} }
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) { func (wm *WantManager) CurrentWants() []*wantlist.Entry {
var work bool return wm.wl.Entries()
mq.outlk.Lock() }
defer func() {
mq.outlk.Unlock()
if !work {
return
}
select {
case mq.work <- struct{}{}:
default:
}
}()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}
// TODO: add a msg.Combine(...) method func (wm *WantManager) WantCount() int {
// otherwise, combine the one we are holding with the return wm.wl.Len()
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}
} }
...@@ -183,13 +183,13 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -183,13 +183,13 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
log.Event(ctx, "Bitswap.Rebroadcast.idle") log.Event(ctx, "Bitswap.Rebroadcast.idle")
select { select {
case <-tick.C: case <-tick.C:
n := bs.wm.wl.Len() n := bs.wm.WantCount()
if n > 0 { if n > 0 {
log.Debug(n, " keys in bitswap wantlist") log.Debug(n, " keys in bitswap wantlist")
} }
case <-broadcastSignal.C: // resend unfulfilled wantlist keys case <-broadcastSignal.C: // resend unfulfilled wantlist keys
log.Event(ctx, "Bitswap.Rebroadcast.active") log.Event(ctx, "Bitswap.Rebroadcast.active")
entries := bs.wm.wl.Entries() entries := bs.wm.CurrentWants()
if len(entries) == 0 { if len(entries) == 0 {
continue continue
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment