swarm_stream.go 3.92 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
Steven Allen's avatar
Steven Allen committed
4 5 6 7
	"fmt"
	"io"
	"sync"
	"sync/atomic"
Jeromy's avatar
Jeromy committed
8 9
	"time"

Jeromy's avatar
Jeromy committed
10 11
	inet "github.com/libp2p/go-libp2p-net"
	protocol "github.com/libp2p/go-libp2p-protocol"
Steven Allen's avatar
Steven Allen committed
12
	smux "github.com/libp2p/go-stream-muxer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Steven Allen's avatar
Steven Allen committed
15
type streamState int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16

Steven Allen's avatar
Steven Allen committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
const (
	streamOpen streamState = iota
	streamCloseRead
	streamCloseWrite
	streamCloseBoth
	streamReset
)

// Stream is the stream type used by swarm. In general, you won't use this type
// directly.
type Stream struct {
	stream smux.Stream
	conn   *Conn

	state struct {
		sync.Mutex
		v streamState
	}

	notifyLk sync.Mutex

	protocol atomic.Value
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40
}

Steven Allen's avatar
Steven Allen committed
41 42 43 44 45 46 47 48 49
func (s *Stream) String() string {
	return fmt.Sprintf(
		"<swarm.Stream[%s] %s (%s) <-> %s (%s)>",
		s.conn.conn.Transport(),
		s.conn.LocalMultiaddr(),
		s.conn.LocalPeer(),
		s.conn.RemoteMultiaddr(),
		s.conn.RemotePeer(),
	)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51
}

Steven Allen's avatar
Steven Allen committed
52 53 54
// Conn returns the Conn associated with this stream, as an inet.Conn
func (s *Stream) Conn() inet.Conn {
	return s.conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56 57
}

// Read reads bytes from a stream.
Steven Allen's avatar
Steven Allen committed
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
func (s *Stream) Read(p []byte) (int, error) {
	n, err := s.stream.Read(p)
	// TODO: push this down to a lower level for better accuracy.
	if s.conn.swarm.bwc != nil {
		s.conn.swarm.bwc.LogRecvMessage(int64(n))
		s.conn.swarm.bwc.LogRecvMessageStream(int64(n), s.Protocol(), s.Conn().RemotePeer())
	}
	// If we observe an EOF, this stream is now closed for reading.
	// If we're already closed for writing, this stream is now fully closed.
	if err == io.EOF {
		s.state.Lock()
		switch s.state.v {
		case streamCloseWrite:
			s.state.v = streamCloseBoth
			s.remove()
		case streamOpen:
			s.state.v = streamCloseRead
		}
		s.state.Unlock()
	}
	return n, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80 81
}

// Write writes bytes to a stream, flushing for each call.
Steven Allen's avatar
Steven Allen committed
82 83 84 85 86 87 88 89
func (s *Stream) Write(p []byte) (int, error) {
	n, err := s.stream.Write(p)
	// TODO: push this down to a lower level for better accuracy.
	if s.conn.swarm.bwc != nil {
		s.conn.swarm.bwc.LogSentMessage(int64(n))
		s.conn.swarm.bwc.LogSentMessageStream(int64(n), s.Protocol(), s.Conn().RemotePeer())
	}
	return n, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90 91 92 93 94
}

// Close closes the stream, indicating this side is finished
// with the stream.
func (s *Stream) Close() error {
Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100 101 102 103 104 105 106
	err := s.stream.Close()

	s.state.Lock()
	switch s.state.v {
	case streamCloseRead:
		s.state.v = streamCloseBoth
		s.remove()
	case streamOpen:
		s.state.v = streamCloseWrite
	}
	s.state.Unlock()
	return err
107 108
}

Steven Allen's avatar
Steven Allen committed
109 110
// Reset resets the stream, closing both ends.
func (s *Stream) Reset() error {
Steven Allen's avatar
Steven Allen committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
	err := s.stream.Reset()
	s.state.Lock()
	switch s.state.v {
	case streamOpen, streamCloseRead, streamCloseWrite:
		s.state.v = streamReset
		s.remove()
	}
	s.state.Unlock()
	return err
}

func (s *Stream) remove() {
	s.conn.removeStream(s)

	// We *must* do this in a goroutine. This can be called during a
	// an open notification and will block until that notification is done.
	go func() {
		s.notifyLk.Lock()
		defer s.notifyLk.Unlock()

		s.conn.swarm.notifyAll(func(f inet.Notifiee) {
			f.ClosedStream(s.conn.swarm, s)
		})
		s.conn.swarm.refs.Done()
	}()
Steven Allen's avatar
Steven Allen committed
136 137
}

Steven Allen's avatar
Steven Allen committed
138
// Protocol returns the protocol negotiated on this stream (if set).
139
func (s *Stream) Protocol() protocol.ID {
Steven Allen's avatar
Steven Allen committed
140 141 142
	// Ignore type error. It means that the protocol is unset.
	p, _ := s.protocol.Load().(protocol.ID)
	return p
143 144
}

Steven Allen's avatar
Steven Allen committed
145 146 147 148 149
// SetProtocol sets the protocol for this stream.
//
// This doesn't actually *do* anything other than record the fact that we're
// speaking the given protocol over this stream. It's still up to the user to
// negotiate the protocol. This is usually done by the Host.
150
func (s *Stream) SetProtocol(p protocol.ID) {
Steven Allen's avatar
Steven Allen committed
151
	s.protocol.Store(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
}
Jeromy's avatar
Jeromy committed
153

Steven Allen's avatar
Steven Allen committed
154
// SetDeadline sets the read and write deadlines for this stream.
Jeromy's avatar
Jeromy committed
155
func (s *Stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
156
	return s.stream.SetDeadline(t)
Jeromy's avatar
Jeromy committed
157 158
}

Steven Allen's avatar
Steven Allen committed
159
// SetReadDeadline sets the read deadline for this stream.
Jeromy's avatar
Jeromy committed
160
func (s *Stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
161
	return s.stream.SetReadDeadline(t)
Jeromy's avatar
Jeromy committed
162 163
}

Steven Allen's avatar
Steven Allen committed
164
// SetWriteDeadline sets the write deadline for this stream.
Jeromy's avatar
Jeromy committed
165
func (s *Stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
166
	return s.stream.SetWriteDeadline(t)
Jeromy's avatar
Jeromy committed
167
}