swarm_stream.go 4.13 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package swarm

import (
Steven Allen's avatar
Steven Allen committed
4 5 6 7
	"fmt"
	"io"
	"sync"
	"sync/atomic"
Jeromy's avatar
Jeromy committed
8 9
	"time"

Jeromy's avatar
Jeromy committed
10 11
	inet "github.com/libp2p/go-libp2p-net"
	protocol "github.com/libp2p/go-libp2p-protocol"
Steven Allen's avatar
Steven Allen committed
12
	smux "github.com/libp2p/go-stream-muxer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Steven Allen's avatar
Steven Allen committed
15
type streamState int
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16

Steven Allen's avatar
Steven Allen committed
17 18 19 20 21 22 23 24
const (
	streamOpen streamState = iota
	streamCloseRead
	streamCloseWrite
	streamCloseBoth
	streamReset
)

25 26 27
// Validate Stream conforms to the go-libp2p-net Stream interface
var _ inet.Stream = &Stream{}

Steven Allen's avatar
Steven Allen committed
28 29 30 31 32 33 34 35 36 37 38 39 40 41
// Stream is the stream type used by swarm. In general, you won't use this type
// directly.
type Stream struct {
	stream smux.Stream
	conn   *Conn

	state struct {
		sync.Mutex
		v streamState
	}

	notifyLk sync.Mutex

	protocol atomic.Value
42 43

	stat inet.Stat
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44 45
}

Steven Allen's avatar
Steven Allen committed
46 47 48 49 50 51 52 53 54
func (s *Stream) String() string {
	return fmt.Sprintf(
		"<swarm.Stream[%s] %s (%s) <-> %s (%s)>",
		s.conn.conn.Transport(),
		s.conn.LocalMultiaddr(),
		s.conn.LocalPeer(),
		s.conn.RemoteMultiaddr(),
		s.conn.RemotePeer(),
	)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55 56
}

Steven Allen's avatar
Steven Allen committed
57 58 59
// Conn returns the Conn associated with this stream, as an inet.Conn
func (s *Stream) Conn() inet.Conn {
	return s.conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62
}

// Read reads bytes from a stream.
Steven Allen's avatar
Steven Allen committed
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
func (s *Stream) Read(p []byte) (int, error) {
	n, err := s.stream.Read(p)
	// TODO: push this down to a lower level for better accuracy.
	if s.conn.swarm.bwc != nil {
		s.conn.swarm.bwc.LogRecvMessage(int64(n))
		s.conn.swarm.bwc.LogRecvMessageStream(int64(n), s.Protocol(), s.Conn().RemotePeer())
	}
	// If we observe an EOF, this stream is now closed for reading.
	// If we're already closed for writing, this stream is now fully closed.
	if err == io.EOF {
		s.state.Lock()
		switch s.state.v {
		case streamCloseWrite:
			s.state.v = streamCloseBoth
			s.remove()
		case streamOpen:
			s.state.v = streamCloseRead
		}
		s.state.Unlock()
	}
	return n, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84 85 86
}

// Write writes bytes to a stream, flushing for each call.
Steven Allen's avatar
Steven Allen committed
87 88 89 90 91 92 93 94
func (s *Stream) Write(p []byte) (int, error) {
	n, err := s.stream.Write(p)
	// TODO: push this down to a lower level for better accuracy.
	if s.conn.swarm.bwc != nil {
		s.conn.swarm.bwc.LogSentMessage(int64(n))
		s.conn.swarm.bwc.LogSentMessageStream(int64(n), s.Protocol(), s.Conn().RemotePeer())
	}
	return n, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95 96 97 98 99
}

// Close closes the stream, indicating this side is finished
// with the stream.
func (s *Stream) Close() error {
Steven Allen's avatar
Steven Allen committed
100 101 102 103 104 105 106 107 108 109 110 111
	err := s.stream.Close()

	s.state.Lock()
	switch s.state.v {
	case streamCloseRead:
		s.state.v = streamCloseBoth
		s.remove()
	case streamOpen:
		s.state.v = streamCloseWrite
	}
	s.state.Unlock()
	return err
112 113
}

Steven Allen's avatar
Steven Allen committed
114 115
// Reset resets the stream, closing both ends.
func (s *Stream) Reset() error {
Steven Allen's avatar
Steven Allen committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
	err := s.stream.Reset()
	s.state.Lock()
	switch s.state.v {
	case streamOpen, streamCloseRead, streamCloseWrite:
		s.state.v = streamReset
		s.remove()
	}
	s.state.Unlock()
	return err
}

func (s *Stream) remove() {
	s.conn.removeStream(s)

	// We *must* do this in a goroutine. This can be called during a
	// an open notification and will block until that notification is done.
	go func() {
		s.notifyLk.Lock()
		defer s.notifyLk.Unlock()

		s.conn.swarm.notifyAll(func(f inet.Notifiee) {
			f.ClosedStream(s.conn.swarm, s)
		})
		s.conn.swarm.refs.Done()
	}()
Steven Allen's avatar
Steven Allen committed
141 142
}

Steven Allen's avatar
Steven Allen committed
143
// Protocol returns the protocol negotiated on this stream (if set).
144
func (s *Stream) Protocol() protocol.ID {
Steven Allen's avatar
Steven Allen committed
145 146 147
	// Ignore type error. It means that the protocol is unset.
	p, _ := s.protocol.Load().(protocol.ID)
	return p
148 149
}

Steven Allen's avatar
Steven Allen committed
150 151 152 153 154
// SetProtocol sets the protocol for this stream.
//
// This doesn't actually *do* anything other than record the fact that we're
// speaking the given protocol over this stream. It's still up to the user to
// negotiate the protocol. This is usually done by the Host.
155
func (s *Stream) SetProtocol(p protocol.ID) {
Steven Allen's avatar
Steven Allen committed
156
	s.protocol.Store(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
}
Jeromy's avatar
Jeromy committed
158

Steven Allen's avatar
Steven Allen committed
159
// SetDeadline sets the read and write deadlines for this stream.
Jeromy's avatar
Jeromy committed
160
func (s *Stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
161
	return s.stream.SetDeadline(t)
Jeromy's avatar
Jeromy committed
162 163
}

Steven Allen's avatar
Steven Allen committed
164
// SetReadDeadline sets the read deadline for this stream.
Jeromy's avatar
Jeromy committed
165
func (s *Stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
166
	return s.stream.SetReadDeadline(t)
Jeromy's avatar
Jeromy committed
167 168
}

Steven Allen's avatar
Steven Allen committed
169
// SetWriteDeadline sets the write deadline for this stream.
Jeromy's avatar
Jeromy committed
170
func (s *Stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
171
	return s.stream.SetWriteDeadline(t)
Jeromy's avatar
Jeromy committed
172
}
173 174 175 176 177

// Stat returns metadata information for this stream.
func (s *Stream) Stat() inet.Stat {
	return s.stat
}