ipfs_impl.go 11.4 KB
Newer Older
1 2 3
package network

import (
4
	"context"
5
	"fmt"
Jeromy's avatar
Jeromy committed
6
	"io"
7
	"sync"
8
	"sync/atomic"
9
	"time"
Jeromy's avatar
Jeromy committed
10

Jeromy's avatar
Jeromy committed
11 12 13 14
	bsmsg "github.com/ipfs/go-bitswap/message"

	cid "github.com/ipfs/go-cid"
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	"github.com/libp2p/go-libp2p-core/connmgr"
16
	"github.com/libp2p/go-libp2p-core/helpers"
Raúl Kripalani's avatar
Raúl Kripalani committed
17 18 19 20
	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	peerstore "github.com/libp2p/go-libp2p-core/peerstore"
21
	"github.com/libp2p/go-libp2p-core/protocol"
Raúl Kripalani's avatar
Raúl Kripalani committed
22
	"github.com/libp2p/go-libp2p-core/routing"
23
	"github.com/libp2p/go-libp2p/p2p/protocol/ping"
Steven Allen's avatar
Steven Allen committed
24
	msgio "github.com/libp2p/go-msgio"
Jeromy's avatar
Jeromy committed
25
	ma "github.com/multiformats/go-multiaddr"
26 27
)

Jeromy's avatar
Jeromy committed
28
var log = logging.Logger("bitswap_network")
Jeromy's avatar
Jeromy committed
29

30 31
var sendMessageTimeout = time.Minute * 10

32
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
33
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork {
dirkmc's avatar
dirkmc committed
34
	s := processSettings(opts...)
35

36
	bitswapNetwork := impl{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
		host:    host,
38
		routing: r,
39

dirkmc's avatar
dirkmc committed
40 41 42 43 44 45
		protocolBitswapNoVers:  s.ProtocolPrefix + ProtocolBitswapNoVers,
		protocolBitswapOneZero: s.ProtocolPrefix + ProtocolBitswapOneZero,
		protocolBitswapOneOne:  s.ProtocolPrefix + ProtocolBitswapOneOne,
		protocolBitswap:        s.ProtocolPrefix + ProtocolBitswap,

		supportedProtocols: s.SupportedProtocols,
46
	}
47 48
	bitswapNetwork.connectEvtMgr = newConnectEventManager(&bitswapNetwork)

49
	return &bitswapNetwork
50 51
}

dirkmc's avatar
dirkmc committed
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
func processSettings(opts ...NetOpt) Settings {
	s := Settings{
		SupportedProtocols: []protocol.ID{
			ProtocolBitswap,
			ProtocolBitswapOneOne,
			ProtocolBitswapOneZero,
			ProtocolBitswapNoVers,
		},
	}
	for _, opt := range opts {
		opt(&s)
	}
	for i, proto := range s.SupportedProtocols {
		s.SupportedProtocols[i] = s.ProtocolPrefix + proto
	}
	return s
}

70 71
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
72
type impl struct {
Steven Allen's avatar
Steven Allen committed
73 74 75 76
	// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
	// alignment.
	stats Stats

77 78 79
	host          host.Host
	routing       routing.ContentRouting
	connectEvtMgr *connectEventManager
80

dirkmc's avatar
dirkmc committed
81 82 83 84 85 86
	protocolBitswapNoVers  protocol.ID
	protocolBitswapOneZero protocol.ID
	protocolBitswapOneOne  protocol.ID
	protocolBitswap        protocol.ID

	supportedProtocols []protocol.ID
87

88 89
	// inbound messages from the network are forwarded to the receiver
	receiver Receiver
90 91
}

Jeromy's avatar
Jeromy committed
92
type streamMessageSender struct {
93 94 95 96
	to     peer.ID
	stream network.Stream
	bsnet  *impl
	opts   *MessageSenderOpts
97
	done   chan struct{}
Jeromy's avatar
Jeromy committed
98 99
}

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Stream, err error) {
	defer func() {
		if err != nil {
			s.bsnet.connectEvtMgr.MarkUnresponsive(s.to)
		}
	}()

	if s.stream != nil {
		return s.stream, nil
	}

	if err = s.bsnet.ConnectTo(ctx, s.to); err != nil {
		return nil, err
	}

	stream, err = s.bsnet.newStreamToPeer(ctx, s.to)
Dirk McCormick's avatar
Dirk McCormick committed
116
	if err == nil {
117 118 119 120
		s.stream = stream
		return s.stream, nil
	}
	return nil, err
Jeromy's avatar
Jeromy committed
121 122
}

123
func (s *streamMessageSender) Reset() error {
124 125 126
	err := s.stream.Reset()
	s.stream = nil
	return err
127 128
}

129
func (s *streamMessageSender) Close() error {
130
	close(s.done)
131
	return helpers.FullClose(s.stream)
132 133
}

dirkmc's avatar
dirkmc committed
134
func (s *streamMessageSender) SupportsHave() bool {
135 136 137 138 139 140 141 142 143 144 145 146
	return s.bsnet.SupportsHave(s.stream.Protocol())
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
	// Try to send the message repeatedly
	var err error
	for i := 0; i < s.opts.MaxRetries; i++ {
		if err = s.attemptSend(ctx, msg); err == nil {
			// Sent successfully
			return nil
		}

147 148 149 150 151 152 153 154 155
		// If the sender has been closed or the context cancelled, just bail out
		select {
		case <-ctx.Done():
			return nil
		case <-s.done:
			return nil
		default:
		}

156 157 158 159 160 161 162 163 164 165 166
		// Failed to send so reset stream and try again
		_ = s.Reset()

		if i == s.opts.MaxRetries {
			s.bsnet.connectEvtMgr.MarkUnresponsive(s.to)
			return err
		}

		select {
		case <-ctx.Done():
			return nil
167 168
		case <-s.done:
			return nil
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
		case <-time.After(s.opts.SendErrorBackoff):
			// wait a short time in case disconnect notifications are still propagating
			log.Infof("send message to %s failed but context was not Done: %s", s.to, err)
		}
	}
	return err
}

func (s *streamMessageSender) attemptSend(ctx context.Context, msg bsmsg.BitSwapMessage) error {
	sndctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout)
	defer cancel()

	stream, err := s.Connect(sndctx)
	if err != nil {
		log.Infof("failed to open stream to %s: %s", s.to, err)
		return err
	}

	if err = s.bsnet.msgToStream(sndctx, stream, msg); err != nil {
		log.Infof("failed to send message to %s: %s", s.to, err)
		return err
	}

	return nil
dirkmc's avatar
dirkmc committed
193 194 195 196 197 198
}

func (bsnet *impl) Self() peer.ID {
	return bsnet.host.ID()
}

199 200 201 202 203 204 205 206 207 208 209
func (bsnet *impl) Ping(ctx context.Context, p peer.ID) ping.Result {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	res := <-ping.Ping(ctx, bsnet.host, p)
	return res
}

func (bsnet *impl) Latency(p peer.ID) time.Duration {
	return bsnet.host.Peerstore().LatencyEWMA(p)
}

dirkmc's avatar
dirkmc committed
210 211 212 213 214 215 216 217 218
// Indicates whether the given protocol supports HAVE / DONT_HAVE messages
func (bsnet *impl) SupportsHave(proto protocol.ID) bool {
	switch proto {
	case bsnet.protocolBitswapOneOne, bsnet.protocolBitswapOneZero, bsnet.protocolBitswapNoVers:
		return false
	}
	return true
}

219
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
220 221 222 223
	deadline := time.Now().Add(sendMessageTimeout)
	if dl, ok := ctx.Deadline(); ok {
		deadline = dl
	}
Bob Potter's avatar
Bob Potter committed
224

225
	if err := s.SetWriteDeadline(deadline); err != nil {
226
		log.Warnf("error setting deadline: %s", err)
227 228
	}

dirkmc's avatar
dirkmc committed
229 230 231
	// Older Bitswap versions use a slightly different wire format so we need
	// to convert the message to the appropriate format depending on the remote
	// peer's Bitswap version.
232
	switch s.Protocol() {
dirkmc's avatar
dirkmc committed
233
	case bsnet.protocolBitswapOneOne, bsnet.protocolBitswap:
Bob Potter's avatar
Bob Potter committed
234
		if err := msg.ToNetV1(s); err != nil {
235 236 237
			log.Debugf("error: %s", err)
			return err
		}
dirkmc's avatar
dirkmc committed
238
	case bsnet.protocolBitswapOneZero, bsnet.protocolBitswapNoVers:
Bob Potter's avatar
Bob Potter committed
239
		if err := msg.ToNetV0(s); err != nil {
240 241 242 243 244 245
			log.Debugf("error: %s", err)
			return err
		}
	default:
		return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
	}
246 247

	if err := s.SetWriteDeadline(time.Time{}); err != nil {
248
		log.Warnf("error resetting deadline: %s", err)
249
	}
250
	return nil
Jeromy's avatar
Jeromy committed
251 252
}

253 254 255 256 257
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) {
	sender := &streamMessageSender{
		to:    p,
		bsnet: bsnet,
		opts:  opts,
258
		done:  make(chan struct{}),
Jeromy's avatar
Jeromy committed
259 260
	}

261 262
	conctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout)
	defer cancel()
Jeromy's avatar
Jeromy committed
263

264 265 266 267 268
	_, err := sender.Connect(conctx)
	if err != nil {
		return nil, err
	}
	return sender, nil
269 270 271 272 273 274 275 276
}

func (bsnet *impl) SendMessage(
	ctx context.Context,
	p peer.ID,
	outgoing bsmsg.BitSwapMessage) error {

	s, err := bsnet.newStreamToPeer(ctx, p)
277 278 279
	if err != nil {
		return err
	}
280

281
	if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
Steven Allen's avatar
Steven Allen committed
282
		_ = s.Reset()
Steven Allen's avatar
Steven Allen committed
283
		return err
284
	}
285 286
	atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

287
	// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
Steven Allen's avatar
Steven Allen committed
288
	//nolint
Raúl Kripalani's avatar
Raúl Kripalani committed
289
	go helpers.AwaitEOF(s)
290
	return s.Close()
291
}
292

293 294
func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
	return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...)
295 296
}

297 298
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
dirkmc's avatar
dirkmc committed
299 300 301
	for _, proto := range bsnet.supportedProtocols {
		bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
	}
hannahhoward's avatar
hannahhoward committed
302 303 304
	bsnet.host.Network().Notify((*netNotifiee)(bsnet))
	// TODO: StopNotify.

305
}
306

307
func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
Raúl Kripalani's avatar
Raúl Kripalani committed
308
	return bsnet.host.Connect(ctx, peer.AddrInfo{ID: p})
309 310
}

dirkmc's avatar
dirkmc committed
311 312 313 314
func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error {
	panic("Not implemented: DisconnectFrom() is only used by tests")
}

315
// FindProvidersAsync returns a channel of providers for the given key.
316
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID {
317
	out := make(chan peer.ID, max)
318 319
	go func() {
		defer close(out)
320
		providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
321
		for info := range providers {
322 323
			if info.ID == bsnet.host.ID() {
				continue // ignore self as provider
324
			}
Raúl Kripalani's avatar
Raúl Kripalani committed
325
			bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
326 327
			select {
			case <-ctx.Done():
328
				return
329 330 331 332 333
			case out <- info.ID:
			}
		}
	}()
	return out
334 335 336
}

// Provide provides the key to the network
337
func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error {
338
	return bsnet.routing.Provide(ctx, k, true)
339 340
}

341
// handleNewStream receives a new stream from the network.
Raúl Kripalani's avatar
Raúl Kripalani committed
342
func (bsnet *impl) handleNewStream(s network.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
343
	defer s.Close()
344 345

	if bsnet.receiver == nil {
Steven Allen's avatar
Steven Allen committed
346
		_ = s.Reset()
347 348 349
		return
	}

Steven Allen's avatar
Steven Allen committed
350
	reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
351
	for {
Steven Allen's avatar
Steven Allen committed
352
		received, err := bsmsg.FromMsgReader(reader)
353
		if err != nil {
Jeromy's avatar
Jeromy committed
354
			if err != io.EOF {
Steven Allen's avatar
Steven Allen committed
355
				_ = s.Reset()
Jeromy's avatar
Jeromy committed
356 357 358
				go bsnet.receiver.ReceiveError(err)
				log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
			}
359 360
			return
		}
361

362 363 364
		p := s.Conn().RemotePeer()
		ctx := context.Background()
		log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
365
		bsnet.connectEvtMgr.OnMessage(s.Conn().RemotePeer())
366
		bsnet.receiver.ReceiveMessage(ctx, p, received)
367
		atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
368
	}
369
}
370

Raúl Kripalani's avatar
Raúl Kripalani committed
371
func (bsnet *impl) ConnectionManager() connmgr.ConnManager {
372 373 374
	return bsnet.host.ConnManager()
}

375 376
func (bsnet *impl) Stats() Stats {
	return Stats{
377 378 379 380 381
		MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
		MessagesSent:  atomic.LoadUint64(&bsnet.stats.MessagesSent),
	}
}

382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
type connectEventManager struct {
	bsnet *impl
	lk    sync.Mutex
	conns map[peer.ID]*connState
}

type connState struct {
	refs       int
	responsive bool
}

func newConnectEventManager(bsnet *impl) *connectEventManager {
	return &connectEventManager{
		bsnet: bsnet,
		conns: make(map[peer.ID]*connState),
	}
}

func (c *connectEventManager) Connected(p peer.ID) {
	c.lk.Lock()
	defer c.lk.Unlock()

	state, ok := c.conns[p]
	if !ok {
		state = &connState{responsive: true}
	}
	state.refs++

	if state.refs == 1 && state.responsive {
		c.bsnet.receiver.PeerConnected(p)
	}
}

func (c *connectEventManager) Disconnected(p peer.ID) {
	c.lk.Lock()
	defer c.lk.Unlock()

	state, ok := c.conns[p]
	if !ok {
		// Should never happen
		return
	}
	state.refs--
	c.conns[p] = state

	if state.refs == 0 && state.responsive {
		c.bsnet.receiver.PeerDisconnected(p)
	}
}

func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
	c.lk.Lock()
	defer c.lk.Unlock()

	state, ok := c.conns[p]
	if !ok {
		return
	}
	state.responsive = false
	c.conns[p] = state

	c.bsnet.receiver.PeerDisconnected(p)
}

func (c *connectEventManager) OnMessage(p peer.ID) {
	c.lk.Lock()
	defer c.lk.Unlock()

	state, ok := c.conns[p]
	if ok && !state.responsive {
		state.responsive = true
		c.conns[p] = state
		c.bsnet.receiver.PeerConnected(p)
	}
}

458 459 460 461 462 463
type netNotifiee impl

func (nn *netNotifiee) impl() *impl {
	return (*impl)(nn)
}

Raúl Kripalani's avatar
Raúl Kripalani committed
464
func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
465
	nn.impl().connectEvtMgr.Connected(v.RemotePeer())
466
}
Raúl Kripalani's avatar
Raúl Kripalani committed
467
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
468
	nn.impl().connectEvtMgr.Disconnected(v.RemotePeer())
469
}
dirkmc's avatar
dirkmc committed
470
func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {}
Raúl Kripalani's avatar
Raúl Kripalani committed
471 472 473
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr)         {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr)    {}