dht_net.go 6.75 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"fmt"
6
	"io"
7
	"sync"
8 9
	"time"

10 11
	ggio "github.com/gogo/protobuf/io"
	ctxio "github.com/jbenet/go-context/io"
George Antoniadis's avatar
George Antoniadis committed
12
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
13 14
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
15 16
)

17 18 19
var dhtReadMessageTimeout = time.Minute
var ErrReadTimeout = fmt.Errorf("timed out reading response")

20 21 22 23 24 25 26
// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
	go dht.handleNewMessage(s)
}

func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
	ctx := dht.Context()
27 28
	cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
29 30
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
31 32
	mPeer := s.Conn().RemotePeer()

33 34 35
	for {
		// receive msg
		pmes := new(pb.Message)
36 37 38 39 40 41
		switch err := r.ReadMsg(pmes); err {
		case io.EOF:
			s.Close()
			return
		case nil:
		default:
42
			s.Reset()
43 44 45 46 47 48 49 50 51 52
			log.Debugf("Error unmarshaling data: %s", err)
			return
		}

		// update the peer (on valid msgs only)
		dht.updateFromMessage(ctx, mPeer, pmes)

		// get handler for this msg type.
		handler := dht.handlerForMsgType(pmes.GetType())
		if handler == nil {
53
			s.Reset()
54 55 56 57 58 59 60
			log.Debug("got back nil handler from handlerForMsgType")
			return
		}

		// dispatch handler.
		rpmes, err := handler(ctx, mPeer, pmes)
		if err != nil {
61
			s.Reset()
62 63 64 65 66 67
			log.Debugf("handle message error: %s", err)
			return
		}

		// if nil response, return it before serializing
		if rpmes == nil {
68
			log.Debug("got back nil response from request")
69 70 71 72 73
			continue
		}

		// send out response msg
		if err := w.WriteMsg(rpmes); err != nil {
74
			s.Reset()
75 76 77
			log.Debugf("send response error: %s", err)
			return
		}
78 79 80 81 82
	}
}

// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
83
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
84

85 86 87 88
	ms, err := dht.messageSenderForPeer(p)
	if err != nil {
		return nil, err
	}
89 90 91

	start := time.Now()

92 93
	rpmes, err := ms.SendRequest(ctx, pmes)
	if err != nil {
94 95 96
		return nil, err
	}

97 98 99
	// update the peer (on valid msgs only)
	dht.updateFromMessage(ctx, p, rpmes)

100
	dht.peerstore.RecordLatency(p, time.Since(start))
101 102 103
	log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
	return rpmes, nil
}
104 105

// sendMessage sends out a message
106
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
107 108 109 110
	ms, err := dht.messageSenderForPeer(p)
	if err != nil {
		return err
	}
111

112
	if err := ms.SendMessage(ctx, pmes); err != nil {
113 114 115 116 117
		return err
	}
	log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
	return nil
}
118 119 120 121 122

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
	dht.Update(ctx, p)
	return nil
}
123

124
func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
125 126
	dht.smlk.Lock()
	ms, ok := dht.strmap[p]
127 128 129
	if ok {
		dht.smlk.Unlock()
		return ms, nil
130
	}
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	ms = &messageSender{p: p, dht: dht}
	dht.strmap[p] = ms
	dht.smlk.Unlock()

	if err := ms.prepOrInvalidate(); err != nil {
		dht.smlk.Lock()
		defer dht.smlk.Unlock()

		if msCur, ok := dht.strmap[p]; ok {
			// Changed. Use the new one, old one is invalid and
			// not in the map so we can just throw it away.
			if ms != msCur {
				return msCur, nil
			}
			// Not changed, remove the now invalid stream from the
			// map.
			delete(dht.strmap, p)
		}
		// Invalid but not in map. Must have been removed by a disconnect.
		return nil, err
	}
	// All ready to go.
	return ms, nil
154 155 156 157 158 159 160 161 162
}

type messageSender struct {
	s   inet.Stream
	r   ggio.ReadCloser
	w   ggio.WriteCloser
	lk  sync.Mutex
	p   peer.ID
	dht *IpfsDHT
Jeromy's avatar
Jeromy committed
163

164
	invalid   bool
Jeromy's avatar
Jeromy committed
165
	singleMes int
166 167
}

Steven Allen's avatar
Steven Allen committed
168 169 170
// invalidate is called before this messageSender is removed from the strmap.
// It prevents the messageSender from being reused/reinitialized and then
// forgotten (leaving the stream open).
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
func (ms *messageSender) invalidate() {
	ms.invalid = true
	if ms.s != nil {
		ms.s.Reset()
		ms.s = nil
	}
}

func (ms *messageSender) prepOrInvalidate() error {
	ms.lk.Lock()
	defer ms.lk.Unlock()
	if err := ms.prep(); err != nil {
		ms.invalidate()
		return err
	}
	return nil
187 188 189
}

func (ms *messageSender) prep() error {
190 191 192
	if ms.invalid {
		return fmt.Errorf("message sender has been invalidated")
	}
193 194 195 196
	if ms.s != nil {
		return nil
	}

197
	nstr, err := ms.dht.host.NewStream(ms.dht.ctx, ms.p, ms.dht.protocols...)
198 199 200 201
	if err != nil {
		return err
	}

202 203 204 205 206 207 208
	ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
	ms.w = ggio.NewDelimitedWriter(nstr)
	ms.s = nstr

	return nil
}

209 210 211 212
// streamReuseTries is the number of times we will try to reuse a stream to a
// given peer before giving up and reverting to the old one-message-per-stream
// behaviour.
const streamReuseTries = 3
213

214 215 216
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
	ms.lk.Lock()
	defer ms.lk.Unlock()
217 218
	retry := false
	for {
Jeromy's avatar
Jeromy committed
219 220 221 222 223
		if err := ms.prep(); err != nil {
			return err
		}

		if err := ms.w.WriteMsg(pmes); err != nil {
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
			ms.s.Reset()
			ms.s = nil

			if retry {
				log.Info("error writing message, bailing: ", err)
				return err
			} else {
				log.Info("error writing message, trying again: ", err)
				retry = true
				continue
			}
		}

		log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

		if ms.singleMes > streamReuseTries {
Steven Allen's avatar
Steven Allen committed
240
			go inet.FullClose(ms.s)
241 242 243
			ms.s = nil
		} else if retry {
			ms.singleMes++
Jeromy's avatar
Jeromy committed
244 245
		}

246
		return nil
247
	}
248
}
249 250 251 252

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
	ms.lk.Lock()
	defer ms.lk.Unlock()
253 254 255 256 257
	retry := false
	for {
		if err := ms.prep(); err != nil {
			return nil, err
		}
258

259 260 261 262 263 264 265 266 267 268 269 270 271
		if err := ms.w.WriteMsg(pmes); err != nil {
			ms.s.Reset()
			ms.s = nil

			if retry {
				log.Info("error writing message, bailing: ", err)
				return nil, err
			} else {
				log.Info("error writing message, trying again: ", err)
				retry = true
				continue
			}
		}
272

273 274 275 276 277 278 279 280 281 282 283 284 285 286
		mes := new(pb.Message)
		if err := ms.ctxReadMsg(ctx, mes); err != nil {
			ms.s.Reset()
			ms.s = nil

			if retry {
				log.Info("error reading message, bailing: ", err)
				return nil, err
			} else {
				log.Info("error reading message, trying again: ", err)
				retry = true
				continue
			}
		}
287

288
		log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
289

290
		if ms.singleMes > streamReuseTries {
Steven Allen's avatar
Steven Allen committed
291
			go inet.FullClose(ms.s)
292 293 294 295
			ms.s = nil
		} else if retry {
			ms.singleMes++
		}
Jeromy's avatar
Jeromy committed
296

297 298
		return mes, nil
	}
299
}
300 301 302

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
	errc := make(chan error, 1)
303 304 305
	go func(r ggio.ReadCloser) {
		errc <- r.ReadMsg(mes)
	}(ms.r)
306

307 308 309
	t := time.NewTimer(dhtReadMessageTimeout)
	defer t.Stop()

310 311 312 313 314
	select {
	case err := <-errc:
		return err
	case <-ctx.Done():
		return ctx.Err()
315 316
	case <-t.C:
		return ErrReadTimeout
317 318
	}
}