Commit 6cebf01b authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Integrated new network into ipfs

parent 68ae22c6
package message package message
import ( import (
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" "io"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
netmsg "github.com/jbenet/go-ipfs/net/message" inet "github.com/jbenet/go-ipfs/net"
nm "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
ggio "code.google.com/p/gogoprotobuf/io"
) )
// TODO move message.go into the bitswap package // TODO move message.go into the bitswap package
...@@ -38,7 +39,7 @@ type BitSwapMessage interface { ...@@ -38,7 +39,7 @@ type BitSwapMessage interface {
type Exportable interface { type Exportable interface {
ToProto() *pb.Message ToProto() *pb.Message
ToNet(p peer.Peer) (nm.NetMessage, error) ToNet(w io.Writer) error
} }
type impl struct { type impl struct {
...@@ -92,11 +93,14 @@ func (m *impl) AddBlock(b *blocks.Block) { ...@@ -92,11 +93,14 @@ func (m *impl) AddBlock(b *blocks.Block) {
m.blocks[b.Key()] = b m.blocks[b.Key()] = b
} }
func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) { func FromNet(r io.Reader) (BitSwapMessage, error) {
pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
pb := new(pb.Message) pb := new(pb.Message)
if err := proto.Unmarshal(nmsg.Data(), pb); err != nil { if err := pbr.ReadMsg(pb); err != nil {
return nil, err return nil, err
} }
m := newMessageFromProto(*pb) m := newMessageFromProto(*pb)
return m, nil return m, nil
} }
...@@ -112,6 +116,11 @@ func (m *impl) ToProto() *pb.Message { ...@@ -112,6 +116,11 @@ func (m *impl) ToProto() *pb.Message {
return pb return pb
} }
func (m *impl) ToNet(p peer.Peer) (nm.NetMessage, error) { func (m *impl) ToNet(w io.Writer) error {
return nm.FromObject(p, m.ToProto()) pbw := ggio.NewDelimitedWriter(w)
if err := pbw.WriteMsg(m.ToProto()); err != nil {
return err
}
return nil
} }
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
) )
func TestAppendWanted(t *testing.T) { func TestAppendWanted(t *testing.T) {
...@@ -87,18 +86,6 @@ func TestCopyProtoByValue(t *testing.T) { ...@@ -87,18 +86,6 @@ func TestCopyProtoByValue(t *testing.T) {
} }
} }
func TestToNetMethodSetsPeer(t *testing.T) {
m := New()
p := testutil.NewPeerWithIDString("X")
netmsg, err := m.ToNet(p)
if err != nil {
t.Fatal(err)
}
if !(netmsg.Peer().Key() == p.Key()) {
t.Fatal("Peer key is different")
}
}
func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New() original := New()
original.AddWanted(u.Key("M")) original.AddWanted(u.Key("M"))
...@@ -107,13 +94,12 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { ...@@ -107,13 +94,12 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
original.AddWanted(u.Key("T")) original.AddWanted(u.Key("T"))
original.AddWanted(u.Key("F")) original.AddWanted(u.Key("F"))
p := testutil.NewPeerWithIDString("X") var buf bytes.Buffer
netmsg, err := original.ToNet(p) if err := original.ToNet(&buf); err != nil {
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
copied, err := FromNet(netmsg) copied, err := FromNet(&buf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -138,13 +124,12 @@ func TestToAndFromNetMessage(t *testing.T) { ...@@ -138,13 +124,12 @@ func TestToAndFromNetMessage(t *testing.T) {
original.AddBlock(blocks.NewBlock([]byte("F"))) original.AddBlock(blocks.NewBlock([]byte("F")))
original.AddBlock(blocks.NewBlock([]byte("M"))) original.AddBlock(blocks.NewBlock([]byte("M")))
p := testutil.NewPeerWithIDString("X") var buf bytes.Buffer
netmsg, err := original.ToNet(p) if err := original.ToNet(&buf); err != nil {
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
m2, err := FromNet(netmsg) m2, err := FromNet(&buf)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
inet "github.com/jbenet/go-ipfs/net" inet "github.com/jbenet/go-ipfs/net"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util" util "github.com/jbenet/go-ipfs/util"
) )
...@@ -14,46 +13,48 @@ var log = util.Logger("bitswap_network") ...@@ -14,46 +13,48 @@ var log = util.Logger("bitswap_network")
// NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS // NewFromIpfsNetwork returns a BitSwapNetwork supported by underlying IPFS
// Dialer & Service // Dialer & Service
func NewFromIpfsNetwork(s inet.Service, dialer inet.Dialer) BitSwapNetwork { func NewFromIpfsNetwork(n inet.Network) BitSwapNetwork {
bitswapNetwork := impl{ bitswapNetwork := impl{
service: s, network: n,
dialer: dialer,
} }
s.SetHandler(&bitswapNetwork) n.SetHandler(inet.ProtocolBitswap, bitswapNetwork.handleNewStream)
return &bitswapNetwork return &bitswapNetwork
} }
// impl transforms the ipfs network interface, which sends and receives // impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface. // NetMessage objects, into the bitswap network interface.
type impl struct { type impl struct {
service inet.Service network inet.Network
dialer inet.Dialer
// inbound messages from the network are forwarded to the receiver // inbound messages from the network are forwarded to the receiver
receiver Receiver receiver Receiver
} }
// HandleMessage marshals and unmarshals net messages, forwarding them to the // handleNewStream receives a new stream from the network.
// BitSwapMessage receiver func (bsnet *impl) handleNewStream(s inet.Stream) {
func (bsnet *impl) HandleMessage(
ctx context.Context, incoming netmsg.NetMessage) netmsg.NetMessage {
if bsnet.receiver == nil { if bsnet.receiver == nil {
return nil return
} }
received, err := bsmsg.FromNet(incoming) go func() {
defer s.Close()
received, err := bsmsg.FromNet(s)
if err != nil { if err != nil {
go bsnet.receiver.ReceiveError(err) go bsnet.receiver.ReceiveError(err)
return nil return
} }
bsnet.receiver.ReceiveMessage(ctx, incoming.Peer(), received) p := s.Conn().RemotePeer()
return nil ctx := context.Background()
bsnet.receiver.ReceiveMessage(ctx, p, received)
}()
} }
func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error { func (bsnet *impl) DialPeer(ctx context.Context, p peer.Peer) error {
return bsnet.dialer.DialPeer(ctx, p) return bsnet.network.DialPeer(ctx, p)
} }
func (bsnet *impl) SendMessage( func (bsnet *impl) SendMessage(
...@@ -61,11 +62,13 @@ func (bsnet *impl) SendMessage( ...@@ -61,11 +62,13 @@ func (bsnet *impl) SendMessage(
p peer.Peer, p peer.Peer,
outgoing bsmsg.BitSwapMessage) error { outgoing bsmsg.BitSwapMessage) error {
nmsg, err := outgoing.ToNet(p) s, err := bsnet.network.NewStream(inet.ProtocolBitswap, p)
if err != nil { if err != nil {
return err return err
} }
return bsnet.service.SendMessage(ctx, nmsg) defer s.Close()
return outgoing.ToNet(s)
} }
func (bsnet *impl) SendRequest( func (bsnet *impl) SendRequest(
...@@ -73,15 +76,17 @@ func (bsnet *impl) SendRequest( ...@@ -73,15 +76,17 @@ func (bsnet *impl) SendRequest(
p peer.Peer, p peer.Peer,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
outgoingMsg, err := outgoing.ToNet(p) s, err := bsnet.network.NewStream(inet.ProtocolBitswap, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
incomingMsg, err := bsnet.service.SendRequest(ctx, outgoingMsg) defer s.Close()
if err != nil {
if err := outgoing.ToNet(s); err != nil {
return nil, err return nil, err
} }
return bsmsg.FromNet(incomingMsg)
return bsmsg.FromNet(s)
} }
func (bsnet *impl) SetDelegate(r Receiver) { func (bsnet *impl) SetDelegate(r Receiver) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment