Commit 7b1cda70 authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(bitswap:network) propagate errors up the stack

Rather than pushing errors back down to lower layers, propagate the
errors upward.

This commit adds a `ReceiveError` method to BitSwap's network receiver.

Still TODO: rm the error return value from:

    net.service.handler.HandleMessage

This is inspired by delegation patterns in found in the wild.
parent c50d177b
package bitswap package bitswap
import ( import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
...@@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { ...@@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty()) u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
if p == nil { if p == nil {
return nil, nil, errors.New("Received nil Peer") // TODO propagate the error upward
return nil, nil
} }
if incoming == nil { if incoming == nil {
return nil, nil, errors.New("Received nil Message") // TODO propagate the error upward
return nil, nil
} }
bs.strategy.MessageReceived(p, incoming) // FIRST bs.strategy.MessageReceived(p, incoming) // FIRST
...@@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs ...@@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
} }
} }
defer bs.strategy.MessageSent(p, message) defer bs.strategy.MessageSent(p, message)
return p, message, nil return p, message
}
func (bs *bitswap) ReceiveError(err error) {
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
} }
// send strives to ensure that accounting is always performed when a message is // send strives to ensure that accounting is always performed when a message is
......
...@@ -33,7 +33,9 @@ type Adapter interface { ...@@ -33,7 +33,9 @@ type Adapter interface {
type Receiver interface { type Receiver interface {
ReceiveMessage( ReceiveMessage(
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) ( ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error) destination *peer.Peer, outgoing bsmsg.BitSwapMessage)
ReceiveError(error)
} }
// TODO(brian): move this to go-ipfs/net package // TODO(brian): move this to go-ipfs/net package
......
package network package network
import ( import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
...@@ -34,18 +32,16 @@ func (adapter *impl) HandleMessage( ...@@ -34,18 +32,16 @@ func (adapter *impl) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) { ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {
if adapter.receiver == nil { if adapter.receiver == nil {
return nil, errors.New("No receiver. NetMessage dropped") return nil, nil
} }
received, err := bsmsg.FromNet(incoming) received, err := bsmsg.FromNet(incoming)
if err != nil { if err != nil {
return nil, err adapter.receiver.ReceiveError(err)
return nil, nil
} }
p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received) p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
if err != nil {
return nil, err
}
// TODO(brian): put this in a helper function // TODO(brian): put this in a helper function
if bsmsg == nil || p == nil { if bsmsg == nil || p == nil {
...@@ -54,7 +50,8 @@ func (adapter *impl) HandleMessage( ...@@ -54,7 +50,8 @@ func (adapter *impl) HandleMessage(
outgoing, err := bsmsg.ToNet(p) outgoing, err := bsmsg.ToNet(p)
if err != nil { if err != nil {
return nil, err adapter.receiver.ReceiveError(err)
return nil, nil
} }
return outgoing, nil return outgoing, nil
......
...@@ -76,18 +76,7 @@ func (n *network) deliver( ...@@ -76,18 +76,7 @@ func (n *network) deliver(
return errors.New("Invalid input") return errors.New("Invalid input")
} }
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message) nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
// TODO should this error be returned across network boundary?
// TODO this raises an interesting question about network contract. How
// can the network be expected to behave under different failure
// conditions? What if peer is unreachable? Will we know if messages
// aren't delivered?
return err
}
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) { if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return errors.New("Malformed client request") return errors.New("Malformed client request")
...@@ -119,15 +108,12 @@ func (n *network) SendRequest( ...@@ -119,15 +108,12 @@ func (n *network) SendRequest(
if !ok { if !ok {
return nil, errors.New("Cannot locate peer on network") return nil, errors.New("Cannot locate peer on network")
} }
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message) nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
return nil, err
// TODO return nil, NoResponse
}
// TODO dedupe code // TODO dedupe code
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) { if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return nil, errors.New("Malformed client request") r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
} }
// TODO dedupe code // TODO dedupe code
...@@ -144,7 +130,7 @@ func (n *network) SendRequest( ...@@ -144,7 +130,7 @@ func (n *network) SendRequest(
} }
n.deliver(nextReceiver, nextPeer, nextMsg) n.deliver(nextReceiver, nextPeer, nextMsg)
}() }()
return nil, NoResponse return nil, nil
} }
return nextMsg, nil return nextMsg, nil
} }
......
...@@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { ...@@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
ctx context.Context, ctx context.Context,
from *peer.Peer, from *peer.Peer,
incoming bsmsg.BitSwapMessage) ( incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage) {
t.Log("Recipient received a message from the network") t.Log("Recipient received a message from the network")
...@@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { ...@@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
m := bsmsg.New() m := bsmsg.New()
m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr)) m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))
return from, m, nil return from, m
})) }))
t.Log("Build a message and send a synchronous request to recipient") t.Log("Build a message and send a synchronous request to recipient")
...@@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
ctx context.Context, ctx context.Context,
fromWaiter *peer.Peer, fromWaiter *peer.Peer,
msgFromWaiter bsmsg.BitSwapMessage) ( msgFromWaiter bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New() msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr)) msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))
return fromWaiter, msgToWaiter, nil return fromWaiter, msgToWaiter
})) }))
waiter.SetDelegate(lambda(func( waiter.SetDelegate(lambda(func(
ctx context.Context, ctx context.Context,
fromResponder *peer.Peer, fromResponder *peer.Peer,
msgFromResponder bsmsg.BitSwapMessage) ( msgFromResponder bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage) {
// TODO assert that this came from the correct peer and that the message contents are as expected // TODO assert that this came from the correct peer and that the message contents are as expected
ok := false ok := false
...@@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder") t.Fatal("Message not received from the responder")
} }
return nil, nil, nil return nil, nil
})) }))
messageSentAsync := bsmsg.New() messageSentAsync := bsmsg.New()
...@@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
} }
type receiverFunc func(ctx context.Context, p *peer.Peer, type receiverFunc func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error) incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage)
// lambda returns a Receiver instance given a receiver function // lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver { func lambda(f receiverFunc) bsnet.Receiver {
...@@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver { ...@@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver {
} }
type lambdaImpl struct { type lambdaImpl struct {
f func(ctx context.Context, p *peer.Peer, f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage) ( *peer.Peer, bsmsg.BitSwapMessage)
*peer.Peer, bsmsg.BitSwapMessage, error)
} }
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p *peer.Peer, incoming bsmsg.BitSwapMessage) ( p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming) return lam.f(ctx, p, incoming)
} }
func (lam *lambdaImpl) ReceiveError(err error) {
// TODO log error
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment