dht_net.go 5.44 KB
Newer Older
1 2 3
package dht

import (
4
	"sync"
5 6
	"time"

7
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
Jeromy's avatar
Jeromy committed
8
	peer "gx/ipfs/QmQGwpJy9P4yXZySmqkZEXCmbBpJUb8xntCv8Ca4taZwDC/go-libp2p-peer"
Jakub Sztandera's avatar
Jakub Sztandera committed
9
	ctxio "gx/ipfs/QmX6DhWrpBB5NtadXmPSXYNdVvuLfJXoFNMvUMoVvP5UJa/go-context/io"
Jeromy's avatar
Jeromy committed
10
	inet "gx/ipfs/QmXJBB9U6e6ennAJPzk8E2rSaVGuHVR2jCxE9H9gPDtRrq/go-libp2p/p2p/net"
Jeromy's avatar
Jeromy committed
11
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
Jeromy's avatar
Jeromy committed
12
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
13 14 15 16 17 18 19 20 21 22 23
)

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
	go dht.handleNewMessage(s)
}

func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
	defer s.Close()

	ctx := dht.Context()
24 25
	cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
26 27
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
28 29
	mPeer := s.Conn().RemotePeer()

30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	for {
		// receive msg
		pmes := new(pb.Message)
		if err := r.ReadMsg(pmes); err != nil {
			log.Debugf("Error unmarshaling data: %s", err)
			return
		}

		// update the peer (on valid msgs only)
		dht.updateFromMessage(ctx, mPeer, pmes)

		// get handler for this msg type.
		handler := dht.handlerForMsgType(pmes.GetType())
		if handler == nil {
			log.Debug("got back nil handler from handlerForMsgType")
			return
		}

		// dispatch handler.
		rpmes, err := handler(ctx, mPeer, pmes)
		if err != nil {
			log.Debugf("handle message error: %s", err)
			return
		}

		// if nil response, return it before serializing
		if rpmes == nil {
			log.Debug("Got back nil response from request.")
			continue
		}

		// send out response msg
		if err := w.WriteMsg(rpmes); err != nil {
			log.Debugf("send response error: %s", err)
			return
		}
66 67 68 69 70 71 72
	}

	return
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
73
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
74

75
	ms := dht.messageSenderForPeer(p)
76 77 78

	start := time.Now()

79 80
	rpmes, err := ms.SendRequest(ctx, pmes)
	if err != nil {
81 82 83
		return nil, err
	}

84 85 86
	// update the peer (on valid msgs only)
	dht.updateFromMessage(ctx, p, rpmes)

87
	dht.peerstore.RecordLatency(p, time.Since(start))
88 89 90
	log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
	return rpmes, nil
}
91 92

// sendMessage sends out a message
93
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
94

95
	ms := dht.messageSenderForPeer(p)
96

97
	if err := ms.SendMessage(ctx, pmes); err != nil {
98 99 100 101 102
		return err
	}
	log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
	return nil
}
103 104 105 106 107

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
	dht.Update(ctx, p)
	return nil
}
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128

func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) *messageSender {
	dht.smlk.Lock()
	defer dht.smlk.Unlock()

	ms, ok := dht.strmap[p]
	if !ok {
		ms = dht.newMessageSender(p)
		dht.strmap[p] = ms
	}

	return ms
}

type messageSender struct {
	s   inet.Stream
	r   ggio.ReadCloser
	w   ggio.WriteCloser
	lk  sync.Mutex
	p   peer.ID
	dht *IpfsDHT
Jeromy's avatar
Jeromy committed
129 130

	singleMes int
131 132 133 134 135 136 137 138 139 140 141 142
}

func (dht *IpfsDHT) newMessageSender(p peer.ID) *messageSender {
	return &messageSender{p: p, dht: dht}
}

func (ms *messageSender) prep() error {
	if ms.s != nil {
		return nil
	}

	nstr, err := ms.dht.host.NewStream(ms.dht.ctx, ProtocolDHT, ms.p)
143 144 145 146
	if err != nil {
		return err
	}

147 148 149 150 151 152 153
	ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
	ms.w = ggio.NewDelimitedWriter(nstr)
	ms.s = nstr

	return nil
}

154 155 156 157
// streamReuseTries is the number of times we will try to reuse a stream to a
// given peer before giving up and reverting to the old one-message-per-stream
// behaviour.
const streamReuseTries = 3
158

159 160 161 162
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
	ms.lk.Lock()
	defer ms.lk.Unlock()
	if err := ms.prep(); err != nil {
163 164
		return err
	}
165

Jeromy's avatar
Jeromy committed
166 167 168 169
	if err := ms.writeMessage(pmes); err != nil {
		return err
	}

170
	if ms.singleMes > streamReuseTries {
Jeromy's avatar
Jeromy committed
171 172 173 174
		ms.s.Close()
		ms.s = nil
	}

175 176
	return nil
}
177

Jeromy's avatar
Jeromy committed
178
func (ms *messageSender) writeMessage(pmes *pb.Message) error {
179 180
	err := ms.w.WriteMsg(pmes)
	if err != nil {
Jeromy's avatar
Jeromy committed
181 182 183 184 185
		// If the other side isnt expecting us to be reusing streams, we're gonna
		// end up erroring here. To make sure things work seamlessly, lets retry once
		// before continuing

		log.Infof("error writing message: ", err)
186 187
		ms.s.Close()
		ms.s = nil
Jeromy's avatar
Jeromy committed
188 189 190 191 192 193 194 195 196 197 198
		if err := ms.prep(); err != nil {
			return err
		}

		if err := ms.w.WriteMsg(pmes); err != nil {
			return err
		}

		// keep track of this happening. If it happens a few times, its
		// likely we can assume the otherside will never support stream reuse
		ms.singleMes++
199
	}
200 201
	return nil
}
202 203 204 205 206 207 208 209

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
	ms.lk.Lock()
	defer ms.lk.Unlock()
	if err := ms.prep(); err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
210
	if err := ms.writeMessage(pmes); err != nil {
211 212 213 214 215 216
		return nil, err
	}

	log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

	mes := new(pb.Message)
217
	if err := ms.ctxReadMsg(ctx, mes); err != nil {
218 219 220 221 222
		ms.s.Close()
		ms.s = nil
		return nil, err
	}

223
	if ms.singleMes > streamReuseTries {
Jeromy's avatar
Jeromy committed
224 225 226 227
		ms.s.Close()
		ms.s = nil
	}

228 229
	return mes, nil
}
230 231 232 233 234 235 236 237 238 239 240 241 242 243

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
	errc := make(chan error, 1)
	go func() {
		errc <- ms.r.ReadMsg(mes)
	}()

	select {
	case err := <-errc:
		return err
	case <-ctx.Done():
		return ctx.Err()
	}
}