libp2p_impl.go 3.83 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
package network

import (
	"context"
	"fmt"
	"io"
	"time"

	gsmsg "github.com/ipfs/go-graphsync/message"

	ggio "github.com/gogo/protobuf/io"
	logging "github.com/ipfs/go-log"
	host "github.com/libp2p/go-libp2p-host"
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
16
	pstore "github.com/libp2p/go-libp2p-peerstore"
17 18 19 20 21 22 23
)

var log = logging.Logger("graphsync_network")

var sendMessageTimeout = time.Minute * 10

// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
24
func NewFromLibp2pHost(host host.Host) GraphSyncNetwork {
25
	graphSyncNetwork := libp2pGraphSyncNetwork{
26
		host: host,
27 28 29 30 31 32 33 34 35
	}
	host.SetStreamHandler(ProtocolGraphsync, graphSyncNetwork.handleNewStream)

	return &graphSyncNetwork
}

// libp2pGraphSyncNetwork transforms the libp2p host interface, which sends and receives
// NetMessage objects, into the graphsync network interface.
type libp2pGraphSyncNetwork struct {
36
	host host.Host
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
}

type streamMessageSender struct {
	s inet.Stream
}

func (s *streamMessageSender) Close() error {
	return inet.FullClose(s.s)
}

func (s *streamMessageSender) Reset() error {
	return s.s.Reset()
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
	return msgToStream(ctx, s.s, msg)
}

func msgToStream(ctx context.Context, s inet.Stream, msg gsmsg.GraphSyncMessage) error {
58 59 60
	log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks",
		len(msg.Requests()), len(msg.Responses()), len(msg.Blocks()))

61 62 63 64 65 66 67 68 69 70
	deadline := time.Now().Add(sendMessageTimeout)
	if dl, ok := ctx.Deadline(); ok {
		deadline = dl
	}
	if err := s.SetWriteDeadline(deadline); err != nil {
		log.Warningf("error setting deadline: %s", err)
	}

	switch s.Protocol() {
	case ProtocolGraphsync:
hannahhoward's avatar
hannahhoward committed
71
		if err := msg.ToNet(s); err != nil {
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
			log.Debugf("error: %s", err)
			return err
		}
	default:
		return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
	}

	if err := s.SetWriteDeadline(time.Time{}); err != nil {
		log.Warningf("error resetting deadline: %s", err)
	}
	return nil
}

func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
	s, err := gsnet.newStreamToPeer(ctx, p)
	if err != nil {
		return nil, err
	}

	return &streamMessageSender{s: s}, nil
}

func (gsnet *libp2pGraphSyncNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
	return gsnet.host.NewStream(ctx, p, ProtocolGraphsync)
}

func (gsnet *libp2pGraphSyncNetwork) SendMessage(
	ctx context.Context,
	p peer.ID,
	outgoing gsmsg.GraphSyncMessage) error {

	s, err := gsnet.newStreamToPeer(ctx, p)
	if err != nil {
		return err
	}

	if err = msgToStream(ctx, s, outgoing); err != nil {
		s.Reset()
		return err
	}

	// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
	go inet.AwaitEOF(s)
	return s.Close()

}

func (gsnet *libp2pGraphSyncNetwork) SetDelegate(r Receiver) {
	gsnet.receiver = r
}

123 124 125 126
func (gsnet *libp2pGraphSyncNetwork) ConnectTo(ctx context.Context, p peer.ID) error {
	return gsnet.host.Connect(ctx, pstore.PeerInfo{ID: p})
}

127 128 129 130 131 132 133 134 135 136 137
// handleNewStream receives a new stream from the network.
func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s inet.Stream) {
	defer s.Close()

	if gsnet.receiver == nil {
		s.Reset()
		return
	}

	reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
	for {
138
		received, err := gsmsg.FromPBReader(reader)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
		if err != nil {
			if err != io.EOF {
				s.Reset()
				go gsnet.receiver.ReceiveError(err)
				log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
			}
			return
		}

		p := s.Conn().RemotePeer()
		ctx := context.Background()
		log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
		gsnet.receiver.ReceiveMessage(ctx, p, received)
	}
}