stream.go 1.39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
package p2p

import (
	"io"

	ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
	net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net"
	manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net"
	peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
)

// Stream holds information on active incoming and outgoing p2p streams.
type Stream struct {
	Id uint64

	Protocol string

	LocalPeer peer.ID
	LocalAddr ma.Multiaddr

	RemotePeer peer.ID
	RemoteAddr ma.Multiaddr

	Local  manet.Conn
	Remote net.Stream

	Registry *StreamRegistry
}

// Reset closes stream endpoints and deregisters it
func (s *Stream) Reset() error {
	s.Local.Close()
	s.Remote.Reset()
	s.Registry.Deregister(s.Id)
	return nil
}

func (s *Stream) startStreaming() {
	go func() {
40 41
		io.Copy(s.Local, s.Remote)
		s.Reset()
42 43 44
	}()

	go func() {
45 46
		io.Copy(s.Remote, s.Local)
		s.Reset()
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
	}()
}

// StreamRegistry is a collection of active incoming and outgoing proto app streams.
type StreamRegistry struct {
	Streams map[uint64]*Stream

	nextId uint64
}

// Register registers a stream to the registry
func (c *StreamRegistry) Register(streamInfo *Stream) {
	streamInfo.Id = c.nextId
	c.Streams[c.nextId] = streamInfo
	c.nextId++
}

// Deregister deregisters stream from the registry
func (c *StreamRegistry) Deregister(streamId uint64) {
	delete(c.Streams, streamId)
}