Commit 1555ce7c authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

bitswap dials peers

Important bugfix. Otherwise bitswap cannot message peers
the node has not connected to yet :(
parent d2671afd
......@@ -24,13 +24,12 @@ func NetMessageSession(parent context.Context, p *peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.Datastore, nice bool) exchange.Interface {
networkAdapter := bsnet.NetMessageAdapter(srv, nil)
networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategy: strategy.New(nice),
routing: directory,
network: net,
sender: networkAdapter,
wantlist: u.NewKeySet(),
}
......@@ -42,9 +41,6 @@ func NetMessageSession(parent context.Context, p *peer.Peer,
// bitswap instances implement the bitswap protocol.
type bitswap struct {
// network maintains connections to the outside world.
network inet.Network
// sender delivers messages on behalf of the session
sender bsnet.Adapter
......@@ -88,8 +84,16 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
for iiiii := range peersToQuery {
log.Debug("bitswap got peersToQuery: %s", iiiii)
go func(p *peer.Peer) {
err := bs.sender.DialPeer(p)
if err != nil {
log.Error("Error sender.DialPeer(%s)", p)
return
}
response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
log.Error("Error sender.SendRequest(%s)", p)
return
}
// FIXME ensure accounting is handled correctly when
......
......@@ -11,6 +11,9 @@ import (
// Adapter provides network connectivity for BitSwap sessions
type Adapter interface {
// DialPeer ensures there is a connection to peer.
DialPeer(*peer.Peer) error
// SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,
......
......@@ -10,9 +10,10 @@ import (
)
// NetMessageAdapter wraps a NetMessage network service
func NetMessageAdapter(s inet.Service, r Receiver) Adapter {
func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter {
adapter := impl{
nms: s,
net: n,
receiver: r,
}
s.SetHandler(&adapter)
......@@ -22,6 +23,7 @@ func NetMessageAdapter(s inet.Service, r Receiver) Adapter {
// implements an Adapter that integrates with a NetMessage network service
type impl struct {
nms inet.Service
net inet.Network
// inbound messages from the network are forwarded to the receiver
receiver Receiver
......@@ -58,6 +60,10 @@ func (adapter *impl) HandleMessage(
return outgoing
}
func (adapter *impl) DialPeer(p *peer.Peer) error {
return adapter.DialPeer(p)
}
func (adapter *impl) SendMessage(
ctx context.Context,
p *peer.Peer,
......
......@@ -3,6 +3,7 @@ package bitswap
import (
"bytes"
"errors"
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
......@@ -14,6 +15,8 @@ import (
type Network interface {
Adapter(*peer.Peer) bsnet.Adapter
HasPeer(*peer.Peer) bool
SendMessage(
ctx context.Context,
from *peer.Peer,
......@@ -49,6 +52,11 @@ func (n *network) Adapter(p *peer.Peer) bsnet.Adapter {
return client
}
func (n *network) HasPeer(p *peer.Peer) bool {
_, found := n.clients[p.Key()]
return found
}
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
......@@ -155,6 +163,14 @@ func (nc *networkClient) SendRequest(
return nc.network.SendRequest(ctx, nc.local, to, message)
}
func (nc *networkClient) DialPeer(p *peer.Peer) error {
// no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) {
return fmt.Errorf("Peer not in network: %s", p)
}
return nil
}
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment