dht_net.go 3.66 KB
Newer Older
1 2 3 4 5 6 7
package dht

import (
	"errors"
	"time"

	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	peer "github.com/jbenet/go-ipfs/p2p/peer"
9
	pb "github.com/jbenet/go-ipfs/routing/dht/pb"
10
	ctxutil "github.com/jbenet/go-ipfs/util/ctx"
11 12

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
13
	ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
14 15 16 17 18 19 20 21 22 23 24
)

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
	go dht.handleNewMessage(s)
}

func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
	defer s.Close()

	ctx := dht.Context()
25 26 27 28
	cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
29 30 31 32 33
	mPeer := s.Conn().RemotePeer()

	// receive msg
	pmes := new(pb.Message)
	if err := r.ReadMsg(pmes); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
		log.Errorf("Error unmarshaling data: %s", err)
35 36
		return
	}
37

38
	// update the peer (on valid msgs only)
39
	dht.updateFromMessage(ctx, mPeer, pmes)
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

	log.Event(ctx, "foo", dht.self, mPeer, pmes)

	// get handler for this msg type.
	handler := dht.handlerForMsgType(pmes.GetType())
	if handler == nil {
		log.Error("got back nil handler from handlerForMsgType")
		return
	}

	// dispatch handler.
	rpmes, err := handler(ctx, mPeer, pmes)
	if err != nil {
		log.Errorf("handle message error: %s", err)
		return
	}

	// if nil response, return it before serializing
	if rpmes == nil {
		log.Warning("Got back nil response from request.")
		return
	}

	// send out response msg
	if err := w.WriteMsg(rpmes); err != nil {
		log.Errorf("send response error: %s", err)
		return
	}

	return
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
74
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
75 76 77 78 79 80 81 82

	log.Debugf("%s dht starting stream", dht.self)
	s, err := dht.network.NewStream(inet.ProtocolDHT, p)
	if err != nil {
		return nil, err
	}
	defer s.Close()

83 84 85 86
	cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106

	start := time.Now()

	log.Debugf("%s writing", dht.self)
	if err := w.WriteMsg(pmes); err != nil {
		return nil, err
	}
	log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)

	log.Debugf("%s reading", dht.self)
	defer log.Debugf("%s done", dht.self)

	rpmes := new(pb.Message)
	if err := r.ReadMsg(rpmes); err != nil {
		return nil, err
	}
	if rpmes == nil {
		return nil, errors.New("no response to request")
	}

107 108 109
	// update the peer (on valid msgs only)
	dht.updateFromMessage(ctx, p, rpmes)

110
	dht.peerstore.RecordLatency(p, time.Since(start))
111 112 113
	log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
	return rpmes, nil
}
114 115

// sendMessage sends out a message
116
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
117 118 119 120 121 122 123 124

	log.Debugf("%s dht starting stream", dht.self)
	s, err := dht.network.NewStream(inet.ProtocolDHT, p)
	if err != nil {
		return err
	}
	defer s.Close()

125 126
	cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
	w := ggio.NewDelimitedWriter(cw)
127 128 129 130 131 132 133 134 135

	log.Debugf("%s writing", dht.self)
	if err := w.WriteMsg(pmes); err != nil {
		return err
	}
	log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
	log.Debugf("%s done", dht.self)
	return nil
}
136 137 138 139 140

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
	dht.Update(ctx, p)
	return nil
}