Commit b9fa4eed authored by Jeromy Johnson's avatar Jeromy Johnson Committed by Jeromy

mild refactor of bitswap

parent 6315475e
...@@ -4,6 +4,7 @@ package bitswap ...@@ -4,6 +4,7 @@ package bitswap
import ( import (
"errors" "errors"
"fmt"
"math" "math"
"sync" "sync"
"time" "time"
...@@ -324,47 +325,31 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli ...@@ -324,47 +325,31 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
} }
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error {
peer.ID, bsmsg.BitSwapMessage) {
defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done() defer log.EventBegin(ctx, "receiveMessage", p, incoming).Done()
if p == "" {
log.Debug("Received message from nil peer!")
// TODO propagate the error upward
return "", nil
}
if incoming == nil {
log.Debug("Got nil bitswap message!")
// TODO propagate the error upward
return "", nil
}
// This call records changes to wantlists, blocks received, // This call records changes to wantlists, blocks received,
// and number of bytes transfered. // and number of bytes transfered.
bs.engine.MessageReceived(p, incoming) bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused. // TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger // Should only track *useful* messages in ledger
var keys []u.Key
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
bs.blocksRecvd++ bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has { if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++ bs.dupBlocksRecvd++
} }
log.Debugf("got block %s from %s", block, p)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil { if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Debug(err) return fmt.Errorf("ReceiveMessage HasBlock error: %s", err)
} }
cancel() cancel()
}
var keys []u.Key
for _, block := range incoming.Blocks() {
keys = append(keys, block.Key()) keys = append(keys, block.Key())
} }
bs.cancelBlocks(ctx, keys)
// TODO: consider changing this function to not return anything return bs.cancelBlocks(ctx, keys)
return "", nil
} }
// Connected/Disconnected warns bitswap about peer connections // Connected/Disconnected warns bitswap about peer connections
...@@ -384,21 +369,24 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { ...@@ -384,21 +369,24 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.engine.PeerDisconnected(p) bs.engine.PeerDisconnected(p)
} }
func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { func (bs *Bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) error {
if len(bkeys) < 1 { if len(bkeys) < 1 {
return return nil
} }
message := bsmsg.New() message := bsmsg.New()
message.SetFull(false) message.SetFull(false)
for _, k := range bkeys { for _, k := range bkeys {
log.Debug("cancel block: %s", k)
message.Cancel(k) message.Cancel(k)
} }
for _, p := range bs.engine.Peers() { for _, p := range bs.engine.Peers() {
err := bs.send(ctx, p, message) err := bs.send(ctx, p, message)
if err != nil { if err != nil {
log.Debugf("Error sending message: %s", err) log.Debugf("Error sending message: %s", err)
return err
} }
} }
return nil
} }
func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) { func (bs *Bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
......
...@@ -19,12 +19,6 @@ type BitSwapNetwork interface { ...@@ -19,12 +19,6 @@ type BitSwapNetwork interface {
peer.ID, peer.ID,
bsmsg.BitSwapMessage) error bsmsg.BitSwapMessage) error
// SendRequest sends a BitSwap message to a peer and waits for a response.
SendRequest(
context.Context,
peer.ID,
bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error)
// SetDelegate registers the Reciver to handle messages received from the // SetDelegate registers the Reciver to handle messages received from the
// network. // network.
SetDelegate(Receiver) SetDelegate(Receiver)
...@@ -35,8 +29,9 @@ type BitSwapNetwork interface { ...@@ -35,8 +29,9 @@ type BitSwapNetwork interface {
// Implement Receiver to receive messages from the BitSwapNetwork // Implement Receiver to receive messages from the BitSwapNetwork
type Receiver interface { type Receiver interface {
ReceiveMessage( ReceiveMessage(
ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) ( ctx context.Context,
destination peer.ID, outgoing bsmsg.BitSwapMessage) sender peer.ID,
incoming bsmsg.BitSwapMessage) error
ReceiveError(error) ReceiveError(error)
......
...@@ -14,57 +14,6 @@ import ( ...@@ -14,57 +14,6 @@ import (
testutil "github.com/ipfs/go-ipfs/util/testutil" testutil "github.com/ipfs/go-ipfs/util/testutil"
) )
func TestSendRequestToCooperativePeer(t *testing.T) {
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
recipientPeer := testutil.RandIdentityOrFatal(t)
t.Log("Get two network adapters")
initiator := net.Adapter(testutil.RandIdentityOrFatal(t))
recipient := net.Adapter(recipientPeer)
expectedStr := "response from recipient"
recipient.SetDelegate(lambda(func(
ctx context.Context,
from peer.ID,
incoming bsmsg.BitSwapMessage) (
peer.ID, bsmsg.BitSwapMessage) {
t.Log("Recipient received a message from the network")
// TODO test contents of incoming message
m := bsmsg.New()
m.AddBlock(blocks.NewBlock([]byte(expectedStr)))
return from, m
}))
t.Log("Build a message and send a synchronous request to recipient")
message := bsmsg.New()
message.AddBlock(blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), recipientPeer.ID(), message)
if err != nil {
t.Fatal(err)
}
t.Log("Check the contents of the response from recipient")
if response == nil {
t.Fatal("Should have received a response")
}
for _, blockFromRecipient := range response.Blocks() {
if string(blockFromRecipient.Data) == expectedStr {
return
}
}
t.Fatal("Should have returned after finding expected block data")
}
func TestSendMessageAsyncButWaitForResponse(t *testing.T) { func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
responderPeer := testutil.RandIdentityOrFatal(t) responderPeer := testutil.RandIdentityOrFatal(t)
...@@ -80,20 +29,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -80,20 +29,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder.SetDelegate(lambda(func( responder.SetDelegate(lambda(func(
ctx context.Context, ctx context.Context,
fromWaiter peer.ID, fromWaiter peer.ID,
msgFromWaiter bsmsg.BitSwapMessage) ( msgFromWaiter bsmsg.BitSwapMessage) error {
peer.ID, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New() msgToWaiter := bsmsg.New()
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
return fromWaiter, msgToWaiter return nil
})) }))
waiter.SetDelegate(lambda(func( waiter.SetDelegate(lambda(func(
ctx context.Context, ctx context.Context,
fromResponder peer.ID, fromResponder peer.ID,
msgFromResponder bsmsg.BitSwapMessage) ( msgFromResponder bsmsg.BitSwapMessage) error {
peer.ID, bsmsg.BitSwapMessage) {
// TODO assert that this came from the correct peer and that the message contents are as expected // TODO assert that this came from the correct peer and that the message contents are as expected
ok := false ok := false
...@@ -108,7 +56,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -108,7 +56,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
t.Fatal("Message not received from the responder") t.Fatal("Message not received from the responder")
} }
return "", nil return nil
})) }))
messageSentAsync := bsmsg.New() messageSentAsync := bsmsg.New()
...@@ -123,7 +71,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -123,7 +71,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
} }
type receiverFunc func(ctx context.Context, p peer.ID, type receiverFunc func(ctx context.Context, p peer.ID,
incoming bsmsg.BitSwapMessage) (peer.ID, bsmsg.BitSwapMessage) incoming bsmsg.BitSwapMessage) error
// lambda returns a Receiver instance given a receiver function // lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver { func lambda(f receiverFunc) bsnet.Receiver {
...@@ -133,13 +81,11 @@ func lambda(f receiverFunc) bsnet.Receiver { ...@@ -133,13 +81,11 @@ func lambda(f receiverFunc) bsnet.Receiver {
} }
type lambdaImpl struct { type lambdaImpl struct {
f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) ( f func(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) error
peer.ID, bsmsg.BitSwapMessage)
} }
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p peer.ID, incoming bsmsg.BitSwapMessage) ( p peer.ID, incoming bsmsg.BitSwapMessage) error {
peer.ID, bsmsg.BitSwapMessage) {
return lam.f(ctx, p, incoming) return lam.f(ctx, p, incoming)
} }
......
...@@ -72,61 +72,7 @@ func (n *network) deliver( ...@@ -72,61 +72,7 @@ func (n *network) deliver(
n.delay.Wait() n.delay.Wait()
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message) return r.ReceiveMessage(context.TODO(), from, message)
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
return errors.New("Malformed client request")
}
if nextPeer == "" && nextMsg == nil { // no response to send
return nil
}
nextReceiver, ok := n.clients[nextPeer]
if !ok {
return errors.New("Cannot locate peer on network")
}
go n.deliver(nextReceiver, nextPeer, nextMsg)
return nil
}
// TODO
func (n *network) SendRequest(
ctx context.Context,
from peer.ID,
to peer.ID,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error) {
r, ok := n.clients[to]
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
// TODO dedupe code
if (nextPeer == "" && nextMsg != nil) || (nextMsg == nil && nextPeer != "") {
r.ReceiveError(errors.New("Malformed client request"))
return nil, nil
}
// TODO dedupe code
if nextPeer == "" && nextMsg == nil {
return nil, nil
}
// TODO test when receiver doesn't immediately respond to the initiator of the request
if nextPeer != from {
go func() {
nextReceiver, ok := n.clients[nextPeer]
if !ok {
// TODO log the error?
}
n.deliver(nextReceiver, nextPeer, nextMsg)
}()
return nil, nil
}
return nextMsg, nil
} }
type networkClient struct { type networkClient struct {
...@@ -143,13 +89,6 @@ func (nc *networkClient) SendMessage( ...@@ -143,13 +89,6 @@ func (nc *networkClient) SendMessage(
return nc.network.SendMessage(ctx, nc.local, to, message) return nc.network.SendMessage(ctx, nc.local, to, message)
} }
func (nc *networkClient) SendRequest(
ctx context.Context,
to peer.ID,
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
return nc.network.SendRequest(ctx, nc.local, to, message)
}
// FindProvidersAsync returns a channel of providers for the given key // FindProvidersAsync returns a channel of providers for the given key
func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID { func (nc *networkClient) FindProvidersAsync(ctx context.Context, k util.Key, max int) <-chan peer.ID {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment