stream.go 2.49 KB
Newer Older
1 2 3 4
package p2p

import (
	"io"
5
	"sync"
6

Łukasz Magiera's avatar
Łukasz Magiera committed
7
	net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
Łukasz Magiera's avatar
Łukasz Magiera committed
8
	peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
Łukasz Magiera's avatar
Łukasz Magiera committed
9
	manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
Łukasz Magiera's avatar
Łukasz Magiera committed
10
	ifconnmgr "gx/ipfs/QmVz2p8ZVZ5GcWPNWGs2HZHiZyHumZcJpQdMRpxkMDhc2C/go-libp2p-interface-connmgr"
Łukasz Magiera's avatar
Łukasz Magiera committed
11
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
Łukasz Magiera's avatar
Łukasz Magiera committed
12
	protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
13 14
)

15 16
const CMGR_TAG = "stream-fwd"

17 18
// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
19
	id uint64
20

Łukasz Magiera's avatar
Łukasz Magiera committed
21
	Protocol protocol.ID
22

23 24
	OriginAddr ma.Multiaddr
	TargetAddr ma.Multiaddr
Łukasz Magiera's avatar
Łukasz Magiera committed
25
	peer       peer.ID
26 27 28 29 30 31 32

	Local  manet.Conn
	Remote net.Stream

	Registry *StreamRegistry
}

Łukasz Magiera's avatar
Łukasz Magiera committed
33 34 35
// close closes stream endpoints and deregisters it
func (s *Stream) close() error {
	s.Registry.Close(s)
36 37 38
	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
39 40 41
// reset closes stream endpoints and deregisters it
func (s *Stream) reset() error {
	s.Registry.Reset(s)
42 43 44 45 46
	return nil
}

func (s *Stream) startStreaming() {
	go func() {
Łukasz Magiera's avatar
Łukasz Magiera committed
47 48
		_, err := io.Copy(s.Local, s.Remote)
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
49
			s.reset()
Łukasz Magiera's avatar
Łukasz Magiera committed
50
		} else {
Łukasz Magiera's avatar
Łukasz Magiera committed
51
			s.close()
Łukasz Magiera's avatar
Łukasz Magiera committed
52
		}
53 54 55
	}()

	go func() {
56 57
		_, err := io.Copy(s.Remote, s.Local)
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
58
			s.reset()
59
		} else {
Łukasz Magiera's avatar
Łukasz Magiera committed
60
			s.close()
61
		}
62 63 64 65 66
	}()
}

// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
67
	sync.Mutex
68

Łukasz Magiera's avatar
Łukasz Magiera committed
69
	Streams map[uint64]*Stream
Łukasz Magiera's avatar
Łukasz Magiera committed
70
	conns   map[peer.ID]int
Łukasz Magiera's avatar
Łukasz Magiera committed
71
	nextID  uint64
Łukasz Magiera's avatar
Łukasz Magiera committed
72 73

	ifconnmgr.ConnManager
74 75 76
}

// Register registers a stream to the registry
77
func (r *StreamRegistry) Register(streamInfo *Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
78 79
	r.Lock()
	defer r.Unlock()
80

Łukasz Magiera's avatar
Łukasz Magiera committed
81 82 83
	r.ConnManager.TagPeer(streamInfo.peer, CMGR_TAG, 20)
	r.conns[streamInfo.peer]++

84 85 86
	streamInfo.id = r.nextID
	r.Streams[r.nextID] = streamInfo
	r.nextID++
87 88 89
}

// Deregister deregisters stream from the registry
90
func (r *StreamRegistry) Deregister(streamID uint64) {
Łukasz Magiera's avatar
Łukasz Magiera committed
91 92
	r.Lock()
	defer r.Unlock()
93

Łukasz Magiera's avatar
Łukasz Magiera committed
94 95 96 97 98 99 100 101 102 103 104
	s, ok := r.Streams[streamID]
	if !ok {
		return
	}
	p := s.peer
	r.conns[p]--
	if r.conns[p] < 1 {
		delete(r.conns, p)
		r.ConnManager.UntagPeer(p, CMGR_TAG)
	}

105
	delete(r.Streams, streamID)
106
}
Łukasz Magiera's avatar
Łukasz Magiera committed
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

// close closes stream endpoints and deregisters it
func (r *StreamRegistry) Close(s *Stream) error {
	s.Local.Close()
	s.Remote.Close()
	s.Registry.Deregister(s.id)
	return nil
}

// reset closes stream endpoints and deregisters it
func (r *StreamRegistry) Reset(s *Stream) error {
	s.Local.Close()
	s.Remote.Reset()
	s.Registry.Deregister(s.id)
	return nil
}