stream.go 1.72 KB
Newer Older
1 2 3 4
package p2p

import (
	"io"
5
	"sync"
6

Łukasz Magiera's avatar
Łukasz Magiera committed
7 8 9
	manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
	net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
Łukasz Magiera's avatar
Łukasz Magiera committed
10
	"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
11 12 13 14
)

// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
15
	id uint64
16

Łukasz Magiera's avatar
Łukasz Magiera committed
17
	Protocol protocol.ID
18

19 20
	OriginAddr ma.Multiaddr
	TargetAddr ma.Multiaddr
21 22 23 24 25 26 27

	Local  manet.Conn
	Remote net.Stream

	Registry *StreamRegistry
}

28 29 30 31
// Close closes stream endpoints and deregisters it
func (s *Stream) Close() error {
	s.Local.Close()
	s.Remote.Close()
32
	s.Registry.Deregister(s.id)
33 34 35
	return nil
}

36
// Reset closes stream endpoints and deregisters it
37 38 39
func (s *Stream) Reset() error {
	s.Local.Close()
	s.Remote.Reset()
40
	s.Registry.Deregister(s.id)
41 42 43 44 45
	return nil
}

func (s *Stream) startStreaming() {
	go func() {
Łukasz Magiera's avatar
Łukasz Magiera committed
46 47 48 49 50 51
		_, err := io.Copy(s.Local, s.Remote)
		if err != nil {
			s.Reset()
		} else {
			s.Close()
		}
52 53 54
	}()

	go func() {
55 56 57 58 59 60
		_, err := io.Copy(s.Remote, s.Local)
		if err != nil {
			s.Reset()
		} else {
			s.Close()
		}
61 62 63 64 65 66
	}()
}

// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
	Streams map[uint64]*Stream
Łukasz Magiera's avatar
Łukasz Magiera committed
67
	lk      sync.Mutex
68

69
	nextID uint64
70 71 72
}

// Register registers a stream to the registry
73 74 75 76
func (r *StreamRegistry) Register(streamInfo *Stream) {
	r.lk.Lock()
	defer r.lk.Unlock()

77 78 79
	streamInfo.id = r.nextID
	r.Streams[r.nextID] = streamInfo
	r.nextID++
80 81 82
}

// Deregister deregisters stream from the registry
83
func (r *StreamRegistry) Deregister(streamID uint64) {
84 85 86
	r.lk.Lock()
	defer r.lk.Unlock()

87
	delete(r.Streams, streamID)
88
}