stream.go 2.5 KB
Newer Older
1 2 3 4
package p2p

import (
	"io"
5
	"sync"
6

Hector Sanjuan's avatar
Hector Sanjuan committed
7 8 9 10
	ma "gx/ipfs/QmTZBfrPJmjWsCvHEtX5FE6KimVJhsJg5sBbqEFYf4UZtL/go-multiaddr"
	ifconnmgr "gx/ipfs/QmXa6sgzUvP5bgF5CyyV36bZYv5VDRwttggQYUPvFybLVd/go-libp2p-interface-connmgr"
	net "gx/ipfs/QmY3ArotKMKaL7YGfbQfyDrib6RVraLqZYWXZvVgZktBxp/go-libp2p-net"
	peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
Łukasz Magiera's avatar
Łukasz Magiera committed
11
	protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
Hector Sanjuan's avatar
Hector Sanjuan committed
12
	manet "gx/ipfs/Qmc85NSvmSG4Frn9Vb2cBc1rMyULH6D3TNVEfCzSKoUpip/go-multiaddr-net"
13 14
)

Łukasz Magiera's avatar
Łukasz Magiera committed
15
const cmgrTag = "stream-fwd"
16

17 18
// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
19
	id uint64
20

Łukasz Magiera's avatar
Łukasz Magiera committed
21
	Protocol protocol.ID
22

23 24
	OriginAddr ma.Multiaddr
	TargetAddr ma.Multiaddr
Łukasz Magiera's avatar
Łukasz Magiera committed
25
	peer       peer.ID
26 27 28 29 30 31 32

	Local  manet.Conn
	Remote net.Stream

	Registry *StreamRegistry
}

Łukasz Magiera's avatar
Łukasz Magiera committed
33
// close stream endpoints and deregister it
Łukasz Magiera's avatar
Łukasz Magiera committed
34 35
func (s *Stream) close() error {
	s.Registry.Close(s)
36 37 38
	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
39 40 41
// reset closes stream endpoints and deregisters it
func (s *Stream) reset() error {
	s.Registry.Reset(s)
42 43 44 45 46
	return nil
}

func (s *Stream) startStreaming() {
	go func() {
Łukasz Magiera's avatar
Łukasz Magiera committed
47 48
		_, err := io.Copy(s.Local, s.Remote)
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
49
			s.reset()
Łukasz Magiera's avatar
Łukasz Magiera committed
50
		} else {
Łukasz Magiera's avatar
Łukasz Magiera committed
51
			s.close()
Łukasz Magiera's avatar
Łukasz Magiera committed
52
		}
53 54 55
	}()

	go func() {
56 57
		_, err := io.Copy(s.Remote, s.Local)
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
58
			s.reset()
59
		} else {
Łukasz Magiera's avatar
Łukasz Magiera committed
60
			s.close()
61
		}
62 63 64 65 66
	}()
}

// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
67
	sync.Mutex
68

Łukasz Magiera's avatar
Łukasz Magiera committed
69
	Streams map[uint64]*Stream
Łukasz Magiera's avatar
Łukasz Magiera committed
70
	conns   map[peer.ID]int
Łukasz Magiera's avatar
Łukasz Magiera committed
71
	nextID  uint64
Łukasz Magiera's avatar
Łukasz Magiera committed
72 73

	ifconnmgr.ConnManager
74 75 76
}

// Register registers a stream to the registry
77
func (r *StreamRegistry) Register(streamInfo *Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
78 79
	r.Lock()
	defer r.Unlock()
80

Łukasz Magiera's avatar
Łukasz Magiera committed
81
	r.ConnManager.TagPeer(streamInfo.peer, cmgrTag, 20)
Łukasz Magiera's avatar
Łukasz Magiera committed
82 83
	r.conns[streamInfo.peer]++

84 85 86
	streamInfo.id = r.nextID
	r.Streams[r.nextID] = streamInfo
	r.nextID++
87 88

	streamInfo.startStreaming()
89 90 91
}

// Deregister deregisters stream from the registry
92
func (r *StreamRegistry) Deregister(streamID uint64) {
Łukasz Magiera's avatar
Łukasz Magiera committed
93 94
	r.Lock()
	defer r.Unlock()
95

Łukasz Magiera's avatar
Łukasz Magiera committed
96 97 98 99 100 101 102 103
	s, ok := r.Streams[streamID]
	if !ok {
		return
	}
	p := s.peer
	r.conns[p]--
	if r.conns[p] < 1 {
		delete(r.conns, p)
Łukasz Magiera's avatar
Łukasz Magiera committed
104
		r.ConnManager.UntagPeer(p, cmgrTag)
Łukasz Magiera's avatar
Łukasz Magiera committed
105 106
	}

107
	delete(r.Streams, streamID)
108
}
Łukasz Magiera's avatar
Łukasz Magiera committed
109

Łukasz Magiera's avatar
Łukasz Magiera committed
110
// Close stream endpoints and deregister it
Łukasz Magiera's avatar
Łukasz Magiera committed
111 112 113 114 115 116 117
func (r *StreamRegistry) Close(s *Stream) error {
	s.Local.Close()
	s.Remote.Close()
	s.Registry.Deregister(s.id)
	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
118
// Reset closes stream endpoints and deregisters it
Łukasz Magiera's avatar
Łukasz Magiera committed
119 120 121 122 123 124
func (r *StreamRegistry) Reset(s *Stream) error {
	s.Local.Close()
	s.Remote.Reset()
	s.Registry.Deregister(s.id)
	return nil
}