Commit 167327fc authored by Łukasz Magiera's avatar Łukasz Magiera

network: Allow specifying protocol prefix

parent 5628965e
......@@ -8,15 +8,16 @@ import (
"time"
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/libp2p/go-libp2p-core/helpers"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
msgio "github.com/libp2p/go-msgio"
ma "github.com/multiformats/go-multiaddr"
......@@ -27,10 +28,19 @@ var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork {
s := Settings{}
for _, opt := range opts {
opt(&s)
}
bitswapNetwork := impl{
host: host,
routing: r,
protocolBitswap: s.ProtocolPrefix + ProtocolBitswap,
protocolBitswapOne: s.ProtocolPrefix + ProtocolBitswapOne,
protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers,
}
return &bitswapNetwork
}
......@@ -41,6 +51,10 @@ type impl struct {
host host.Host
routing routing.ContentRouting
protocolBitswap protocol.ID
protocolBitswapOne protocol.ID
protocolBitswapNoVers protocol.ID
// inbound messages from the network are forwarded to the receiver
receiver Receiver
......@@ -48,7 +62,8 @@ type impl struct {
}
type streamMessageSender struct {
s network.Stream
s network.Stream
bsnet *impl
}
func (s *streamMessageSender) Close() error {
......@@ -60,10 +75,10 @@ func (s *streamMessageSender) Reset() error {
}
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return msgToStream(ctx, s.s, msg)
return s.bsnet.msgToStream(ctx, s.s, msg)
}
func msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
......@@ -74,12 +89,12 @@ func msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage
}
switch s.Protocol() {
case ProtocolBitswap:
case bsnet.protocolBitswap:
if err := msg.ToNetV1(s); err != nil {
log.Debugf("error: %s", err)
return err
}
case ProtocolBitswapOne, ProtocolBitswapNoVers:
case bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers:
if err := msg.ToNetV0(s); err != nil {
log.Debugf("error: %s", err)
return err
......@@ -100,11 +115,11 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSend
return nil, err
}
return &streamMessageSender{s: s}, nil
return &streamMessageSender{s: s, bsnet: bsnet}, nil
}
func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
return bsnet.host.NewStream(ctx, p, ProtocolBitswap, ProtocolBitswapOne, ProtocolBitswapNoVers)
return bsnet.host.NewStream(ctx, p, bsnet.protocolBitswap, bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers)
}
func (bsnet *impl) SendMessage(
......@@ -117,7 +132,7 @@ func (bsnet *impl) SendMessage(
return err
}
if err = msgToStream(ctx, s, outgoing); err != nil {
if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
s.Reset()
return err
}
......@@ -131,9 +146,9 @@ func (bsnet *impl) SendMessage(
func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
bsnet.host.SetStreamHandler(ProtocolBitswap, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(ProtocolBitswapOne, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(ProtocolBitswapNoVers, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream)
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
// TODO: StopNotify.
......
package network
import "github.com/libp2p/go-libp2p-core/protocol"
type NetOpt func(*Settings)
type Settings struct {
ProtocolPrefix protocol.ID
}
func Prefix(prefix protocol.ID) NetOpt {
return func(settings *Settings) {
settings.ProtocolPrefix = prefix
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment