dht_net.go 3.5 KB
Newer Older
1 2 3 4 5 6
package dht

import (
	"errors"
	"time"

7
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
8
	peer "gx/ipfs/QmQGwpJy9P4yXZySmqkZEXCmbBpJUb8xntCv8Ca4taZwDC/go-libp2p-peer"
Jakub Sztandera's avatar
Jakub Sztandera committed
9
	ctxio "gx/ipfs/QmX6DhWrpBB5NtadXmPSXYNdVvuLfJXoFNMvUMoVvP5UJa/go-context/io"
Jeromy's avatar
Jeromy committed
10
	inet "gx/ipfs/QmXJBB9U6e6ennAJPzk8E2rSaVGuHVR2jCxE9H9gPDtRrq/go-libp2p/p2p/net"
Jeromy's avatar
Jeromy committed
11
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
Jeromy's avatar
Jeromy committed
12
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
13 14 15 16 17 18 19 20 21 22 23
)

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
	go dht.handleNewMessage(s)
}

func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
	defer s.Close()

	ctx := dht.Context()
24 25
	cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
26 27
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
28 29 30 31 32
	mPeer := s.Conn().RemotePeer()

	// receive msg
	pmes := new(pb.Message)
	if err := r.ReadMsg(pmes); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
		log.Debugf("Error unmarshaling data: %s", err)
34 35
		return
	}
36

37
	// update the peer (on valid msgs only)
38
	dht.updateFromMessage(ctx, mPeer, pmes)
39 40 41 42

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
		log.Debug("got back nil handler from handlerForMsgType")
44 45 46 47 48 49
		return
	}

	// dispatch handler.
	rpmes, err := handler(ctx, mPeer, pmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50
		log.Debugf("handle message error: %s", err)
51 52 53 54 55
		return
	}

	// if nil response, return it before serializing
	if rpmes == nil {
56
		log.Debug("Got back nil response from request.")
57 58 59 60 61
		return
	}

	// send out response msg
	if err := w.WriteMsg(rpmes); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
		log.Debugf("send response error: %s", err)
63 64 65 66 67 68 69 70
		return
	}

	return
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
71
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
72

Richard Littauer's avatar
Richard Littauer committed
73
	log.Debugf("%s DHT starting stream", dht.self)
74
	s, err := dht.host.NewStream(ctx, ProtocolDHT, p)
75 76 77 78 79
	if err != nil {
		return nil, err
	}
	defer s.Close()

80 81
	cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
82 83
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

	start := time.Now()

	if err := w.WriteMsg(pmes); err != nil {
		return nil, err
	}
	log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)

	rpmes := new(pb.Message)
	if err := r.ReadMsg(rpmes); err != nil {
		return nil, err
	}
	if rpmes == nil {
		return nil, errors.New("no response to request")
	}

100 101 102
	// update the peer (on valid msgs only)
	dht.updateFromMessage(ctx, p, rpmes)

103
	dht.peerstore.RecordLatency(p, time.Since(start))
104 105 106
	log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
	return rpmes, nil
}
107 108

// sendMessage sends out a message
109
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
110

Richard Littauer's avatar
Richard Littauer committed
111
	log.Debugf("%s DHT starting stream", dht.self)
112
	s, err := dht.host.NewStream(ctx, ProtocolDHT, p)
113 114 115 116 117
	if err != nil {
		return err
	}
	defer s.Close()

118
	cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
119
	w := ggio.NewDelimitedWriter(cw)
120 121 122 123 124 125 126

	if err := w.WriteMsg(pmes); err != nil {
		return err
	}
	log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
	return nil
}
127 128 129 130 131

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
	dht.Update(ctx, p)
	return nil
}