stream.go 1.76 KB
Newer Older
1 2 3 4
package p2p

import (
	"io"
5
	"sync"
6

7
	net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
Łukasz Magiera's avatar
Łukasz Magiera committed
8 9
	manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
Łukasz Magiera's avatar
Łukasz Magiera committed
10
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
11 12
)

13 14
const CMGR_TAG = "stream-fwd"

15 16
// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
17
	id uint64
18

Łukasz Magiera's avatar
Łukasz Magiera committed
19
	Protocol protocol.ID
20

21 22
	OriginAddr ma.Multiaddr
	TargetAddr ma.Multiaddr
23 24 25 26 27

	Local  manet.Conn
	Remote net.Stream

	Registry *StreamRegistry
28 29

	cleanup func()
30 31
}

32 33 34 35
// Close closes stream endpoints and deregisters it
func (s *Stream) Close() error {
	s.Local.Close()
	s.Remote.Close()
36
	s.cleanup()
37
	s.Registry.Deregister(s.id)
38 39 40
	return nil
}

41
// Reset closes stream endpoints and deregisters it
42 43 44
func (s *Stream) Reset() error {
	s.Local.Close()
	s.Remote.Reset()
45
	s.Registry.Deregister(s.id)
46 47 48 49 50
	return nil
}

func (s *Stream) startStreaming() {
	go func() {
Łukasz Magiera's avatar
Łukasz Magiera committed
51 52 53 54 55 56
		_, err := io.Copy(s.Local, s.Remote)
		if err != nil {
			s.Reset()
		} else {
			s.Close()
		}
57 58 59
	}()

	go func() {
60 61 62 63 64 65
		_, err := io.Copy(s.Remote, s.Local)
		if err != nil {
			s.Reset()
		} else {
			s.Close()
		}
66 67 68 69 70
	}()
}

// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
71
	sync.Mutex
72

Łukasz Magiera's avatar
Łukasz Magiera committed
73 74
	Streams map[uint64]*Stream
	nextID  uint64
75 76 77
}

// Register registers a stream to the registry
78
func (r *StreamRegistry) Register(streamInfo *Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
79 80
	r.Lock()
	defer r.Unlock()
81

82 83 84
	streamInfo.id = r.nextID
	r.Streams[r.nextID] = streamInfo
	r.nextID++
85 86 87
}

// Deregister deregisters stream from the registry
88
func (r *StreamRegistry) Deregister(streamID uint64) {
Łukasz Magiera's avatar
Łukasz Magiera committed
89 90
	r.Lock()
	defer r.Unlock()
91

92
	delete(r.Streams, streamID)
93
}