Commit 6370a0b9 authored by Jeromy's avatar Jeromy

cleanup bitswap and handle message send failure slightly better

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent 0f7b0a06
......@@ -82,7 +82,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
})
bs := &Bitswap{
self: p,
blockstore: bstore,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
......@@ -112,34 +111,36 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager
// the ID of the peer to act on behalf of
self peer.ID
// the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine
// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager
// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore
// notifications engine for receiving new blocks and routing them to the
// appropriate user requests
notifications notifications.PubSub
// send keys to a worker to find and connect to providers for them
// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest
engine *decision.Engine
process process.Process
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan *cid.Cid
// provideKeys directly feeds provide workers
provideKeys chan *cid.Cid
process process.Process
// Counters for various statistics
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
......@@ -167,13 +168,12 @@ func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, e
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)
// TODO: this request ID should come in from a higher layer so we can track
// across multiple 'GetBlock' invocations
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
defer func() {
cancelFunc()
}()
defer cancelFunc()
promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
......
......@@ -175,28 +175,13 @@ func (mq *msgQueue) runQueue(ctx context.Context) {
}
func (mq *msgQueue) doWork(ctx context.Context) {
// allow ten minutes for connections
// this includes looking them up in the dht
// dialing them, and handshaking
if mq.sender == nil {
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := mq.network.ConnectTo(conctx, mq.p)
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant connect to peer %s: %s", mq.p, err)
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
log.Infof("cant open new stream to peer %s: %s", mq.p, err)
// TODO: cant open stream, what now?
return
}
mq.sender = nsender
}
// grab outgoing message
......@@ -210,14 +195,64 @@ func (mq *msgQueue) doWork(ctx context.Context) {
mq.outlk.Unlock()
// send wantlist updates
err := mq.sender.SendMsg(wlm)
if err != nil {
for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm)
if err == nil {
return
}
log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender = nil
// TODO: what do we do if this fails?
return
select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}
err = mq.openSender(ctx)
if err != nil {
log.Error("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}
// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}
func (mq *msgQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}
func (pm *WantManager) Connected(p peer.ID) {
......@@ -292,14 +327,13 @@ func (pm *WantManager) Run() {
}
func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.network = wm.network
mq.p = p
mq.refcnt = 1
return mq
return &msgQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
network: wm.network,
p: p,
refcnt: 1,
}
}
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
......@@ -312,8 +346,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
}
}()
// if we have no message held, or the one we are given is full
// overwrite the one we are holding
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}
......
......@@ -197,6 +197,12 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
for {
select {
case e := <-bs.findKeys:
select { // make sure its not already cancelled
case <-e.Ctx.Done():
continue
default:
}
activeLk.Lock()
if kset.Has(e.Cid) {
activeLk.Unlock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment