stream.go 2.23 KB
Newer Older
1 2 3 4
package p2p

import (
	"io"
5
	"sync"
6

tavit ohanian's avatar
tavit ohanian committed
7 8 9 10 11 12
	ma "gitlab.dms3.io/mf/go-multiaddr"
	manet "gitlab.dms3.io/mf/go-multiaddr/net"
	ifconnmgr "gitlab.dms3.io/p2p/go-p2p-core/connmgr"
	net "gitlab.dms3.io/p2p/go-p2p-core/network"
	peer "gitlab.dms3.io/p2p/go-p2p-core/peer"
	protocol "gitlab.dms3.io/p2p/go-p2p-core/protocol"
13 14
)

Łukasz Magiera's avatar
Łukasz Magiera committed
15
const cmgrTag = "stream-fwd"
16

17 18
// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
19
	id uint64
20

Łukasz Magiera's avatar
Łukasz Magiera committed
21
	Protocol protocol.ID
22

23 24
	OriginAddr ma.Multiaddr
	TargetAddr ma.Multiaddr
Łukasz Magiera's avatar
Łukasz Magiera committed
25
	peer       peer.ID
26 27 28 29 30 31 32

	Local  manet.Conn
	Remote net.Stream

	Registry *StreamRegistry
}

Łukasz Magiera's avatar
Łukasz Magiera committed
33
// close stream endpoints and deregister it
34
func (s *Stream) close() {
Łukasz Magiera's avatar
Łukasz Magiera committed
35
	s.Registry.Close(s)
36 37
}

Łukasz Magiera's avatar
Łukasz Magiera committed
38
// reset closes stream endpoints and deregisters it
39
func (s *Stream) reset() {
Łukasz Magiera's avatar
Łukasz Magiera committed
40
	s.Registry.Reset(s)
41 42 43 44
}

func (s *Stream) startStreaming() {
	go func() {
Łukasz Magiera's avatar
Łukasz Magiera committed
45 46
		_, err := io.Copy(s.Local, s.Remote)
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
47
			s.reset()
Łukasz Magiera's avatar
Łukasz Magiera committed
48
		} else {
Łukasz Magiera's avatar
Łukasz Magiera committed
49
			s.close()
Łukasz Magiera's avatar
Łukasz Magiera committed
50
		}
51 52 53
	}()

	go func() {
54 55
		_, err := io.Copy(s.Remote, s.Local)
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
56
			s.reset()
57
		} else {
Łukasz Magiera's avatar
Łukasz Magiera committed
58
			s.close()
59
		}
60 61 62 63 64
	}()
}

// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
65
	sync.Mutex
66

Łukasz Magiera's avatar
Łukasz Magiera committed
67
	Streams map[uint64]*Stream
Łukasz Magiera's avatar
Łukasz Magiera committed
68
	conns   map[peer.ID]int
Łukasz Magiera's avatar
Łukasz Magiera committed
69
	nextID  uint64
Łukasz Magiera's avatar
Łukasz Magiera committed
70 71

	ifconnmgr.ConnManager
72 73 74
}

// Register registers a stream to the registry
75
func (r *StreamRegistry) Register(streamInfo *Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
76 77
	r.Lock()
	defer r.Unlock()
78

Łukasz Magiera's avatar
Łukasz Magiera committed
79
	r.ConnManager.TagPeer(streamInfo.peer, cmgrTag, 20)
Łukasz Magiera's avatar
Łukasz Magiera committed
80 81
	r.conns[streamInfo.peer]++

82 83 84
	streamInfo.id = r.nextID
	r.Streams[r.nextID] = streamInfo
	r.nextID++
85 86

	streamInfo.startStreaming()
87 88 89
}

// Deregister deregisters stream from the registry
90
func (r *StreamRegistry) Deregister(streamID uint64) {
Łukasz Magiera's avatar
Łukasz Magiera committed
91 92
	r.Lock()
	defer r.Unlock()
93

Łukasz Magiera's avatar
Łukasz Magiera committed
94 95 96 97 98 99 100 101
	s, ok := r.Streams[streamID]
	if !ok {
		return
	}
	p := s.peer
	r.conns[p]--
	if r.conns[p] < 1 {
		delete(r.conns, p)
Łukasz Magiera's avatar
Łukasz Magiera committed
102
		r.ConnManager.UntagPeer(p, cmgrTag)
Łukasz Magiera's avatar
Łukasz Magiera committed
103 104
	}

105
	delete(r.Streams, streamID)
106
}
Łukasz Magiera's avatar
Łukasz Magiera committed
107

Łukasz Magiera's avatar
Łukasz Magiera committed
108
// Close stream endpoints and deregister it
109 110 111
func (r *StreamRegistry) Close(s *Stream) {
	_ = s.Local.Close()
	_ = s.Remote.Close()
Łukasz Magiera's avatar
Łukasz Magiera committed
112 113 114
	s.Registry.Deregister(s.id)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
115
// Reset closes stream endpoints and deregisters it
116 117 118
func (r *StreamRegistry) Reset(s *Stream) {
	_ = s.Local.Close()
	_ = s.Remote.Reset()
Łukasz Magiera's avatar
Łukasz Magiera committed
119 120
	s.Registry.Deregister(s.id)
}