mock_stream.go 5.79 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package mocknet

import (
Karthik Bala's avatar
Karthik Bala committed
4
	"bytes"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"io"
7
	"net"
8
	"strconv"
9
	"sync"
iulianpascalau's avatar
iulianpascalau committed
10
	"sync/atomic"
Karthik Bala's avatar
Karthik Bala committed
11 12
	"time"

tavit ohanian's avatar
tavit ohanian committed
13 14 15
	"gitlab.dms3.io/p2p/go-p2p-core/mux"
	"gitlab.dms3.io/p2p/go-p2p-core/network"
	protocol "gitlab.dms3.io/p2p/go-p2p-core/protocol"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19
var streamCounter int64

20
// stream implements network.Stream
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
type stream struct {
22 23 24 25
	notifLk sync.Mutex

	rstream *stream
	conn    *conn
26
	id      int64
27

Steven Allen's avatar
Steven Allen committed
28 29
	write     *io.PipeWriter
	read      *io.PipeReader
Karthik Bala's avatar
Karthik Bala committed
30
	toDeliver chan *transportObject
31 32 33 34 35

	reset  chan struct{}
	close  chan struct{}
	closed chan struct{}

36
	writeErr error
37

iulianpascalau's avatar
iulianpascalau committed
38
	protocol atomic.Value
39
	stat     network.Stat
Karthik Bala's avatar
Karthik Bala committed
40 41
}

Steven Allen's avatar
Steven Allen committed
42 43
var ErrClosed error = errors.New("stream closed")

Karthik Bala's avatar
Karthik Bala committed
44 45 46 47 48
type transportObject struct {
	msg         []byte
	arrivalTime time.Time
}

49 50 51 52 53 54 55 56 57 58 59 60
func newStreamPair() (*stream, *stream) {
	ra, wb := io.Pipe()
	rb, wa := io.Pipe()

	sa := newStream(wa, ra, network.DirOutbound)
	sb := newStream(wb, rb, network.DirInbound)
	sa.rstream = sb
	sb.rstream = sa
	return sa, sb
}

func newStream(w *io.PipeWriter, r *io.PipeReader, dir network.Direction) *stream {
Karthik Bala's avatar
Karthik Bala committed
61
	s := &stream{
Steven Allen's avatar
Steven Allen committed
62 63
		read:      r,
		write:     w,
64
		id:        atomic.AddInt64(&streamCounter, 1),
65 66
		reset:     make(chan struct{}, 1),
		close:     make(chan struct{}, 1),
Steven Allen's avatar
Steven Allen committed
67
		closed:    make(chan struct{}),
Karthik Bala's avatar
Karthik Bala committed
68
		toDeliver: make(chan *transportObject),
69
		stat:      network.Stat{Direction: dir},
Karthik Bala's avatar
Karthik Bala committed
70 71
	}

Steven Allen's avatar
Steven Allen committed
72
	go s.transport()
Karthik Bala's avatar
Karthik Bala committed
73 74 75 76 77 78 79 80
	return s
}

//  How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
	l := s.conn.link
	delay := l.GetLatency() + l.RateLimit(len(p))
	t := time.Now().Add(delay)
Steven Allen's avatar
Steven Allen committed
81 82 83 84 85

	// Copy it.
	cpy := make([]byte, len(p))
	copy(cpy, p)

Karthik Bala's avatar
Karthik Bala committed
86
	select {
Steven Allen's avatar
Steven Allen committed
87
	case <-s.closed: // bail out if we're closing.
88
		return 0, s.writeErr
Steven Allen's avatar
Steven Allen committed
89
	case s.toDeliver <- &transportObject{msg: cpy, arrivalTime: t}:
Karthik Bala's avatar
Karthik Bala committed
90 91
	}
	return len(p), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93
}

94 95 96 97
func (s *stream) ID() string {
	return strconv.FormatInt(s.id, 10)
}

98
func (s *stream) Protocol() protocol.ID {
iulianpascalau's avatar
iulianpascalau committed
99 100 101
	// Ignore type error. It means that the protocol is unset.
	p, _ := s.protocol.Load().(protocol.ID)
	return p
102 103
}

104
func (s *stream) Stat() network.Stat {
105 106 107
	return s.stat
}

108
func (s *stream) SetProtocol(proto protocol.ID) {
iulianpascalau's avatar
iulianpascalau committed
109
	s.protocol.Store(proto)
110 111
}

112
func (s *stream) CloseWrite() error {
Steven Allen's avatar
Steven Allen committed
113
	select {
114 115
	case s.close <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
116 117
	}
	<-s.closed
118 119
	if s.writeErr != ErrClosed {
		return s.writeErr
Steven Allen's avatar
Steven Allen committed
120
	}
121
	return nil
Karthik Bala's avatar
Karthik Bala committed
122 123
}

124 125 126 127 128 129 130 131 132
func (s *stream) CloseRead() error {
	return s.read.CloseWithError(ErrClosed)
}

func (s *stream) Close() error {
	_ = s.CloseRead()
	return s.CloseWrite()
}

Steven Allen's avatar
Steven Allen committed
133
func (s *stream) Reset() error {
134
	// Cancel any pending reads/writes with an error.
Hlib's avatar
Hlib committed
135 136
	s.write.CloseWithError(mux.ErrReset)
	s.read.CloseWithError(mux.ErrReset)
Karthik Bala's avatar
Karthik Bala committed
137

Steven Allen's avatar
Steven Allen committed
138
	select {
139 140
	case s.reset <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
141 142
	}
	<-s.closed
143 144

	// No meaningful error case here.
145
	return nil
Steven Allen's avatar
Steven Allen committed
146 147 148 149
}

func (s *stream) teardown() {
	// at this point, no streams are writing.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
	s.conn.removeStream(s)
Steven Allen's avatar
Steven Allen committed
151 152 153

	// Mark as closed.
	close(s.closed)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155
}

156
func (s *stream) Conn() network.Conn {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158
	return s.conn
}
Karthik Bala's avatar
Karthik Bala committed
159

160
func (s *stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
161
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
162 163
}

164
func (s *stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
165
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
166 167
}

168
func (s *stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
169
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
170 171 172
}

func (s *stream) Read(b []byte) (int, error) {
Steven Allen's avatar
Steven Allen committed
173
	return s.read.Read(b)
174 175
}

Karthik Bala's avatar
Karthik Bala committed
176 177
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
Steven Allen's avatar
Steven Allen committed
178 179 180
func (s *stream) transport() {
	defer s.teardown()

Karthik Bala's avatar
Karthik Bala committed
181 182
	bufsize := 256
	buf := new(bytes.Buffer)
Steven Allen's avatar
Steven Allen committed
183 184 185 186 187 188 189 190 191 192
	timer := time.NewTimer(0)
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}

	// cleanup
	defer timer.Stop()
Karthik Bala's avatar
Karthik Bala committed
193 194 195

	// writeBuf writes the contents of buf through to the s.Writer.
	// done only when arrival time makes sense.
196
	drainBuf := func() error {
Karthik Bala's avatar
Karthik Bala committed
197
		if buf.Len() > 0 {
Steven Allen's avatar
Steven Allen committed
198
			_, err := s.write.Write(buf.Bytes())
Karthik Bala's avatar
Karthik Bala committed
199
			if err != nil {
200
				return err
Karthik Bala's avatar
Karthik Bala committed
201 202 203
			}
			buf.Reset()
		}
204
		return nil
Karthik Bala's avatar
Karthik Bala committed
205 206 207 208 209
	}

	// deliverOrWait is a helper func that processes
	// an incoming packet. it waits until the arrival time,
	// and then writes things out.
210
	deliverOrWait := func(o *transportObject) error {
Karthik Bala's avatar
Karthik Bala committed
211 212
		buffered := len(o.msg) + buf.Len()

Steven Allen's avatar
Steven Allen committed
213 214 215 216 217 218 219 220 221
		// Yes, we can end up extending a timer multiple times if we
		// keep on making small writes but that shouldn't be too much of an
		// issue. Fixing that would be painful.
		if !timer.Stop() {
			// FIXME: So, we *shouldn't* need to do this but we hang
			// here if we don't... Go bug?
			select {
			case <-timer.C:
			default:
Karthik Bala's avatar
Karthik Bala committed
222
			}
Steven Allen's avatar
Steven Allen committed
223
		}
224
		delay := time.Until(o.arrivalTime)
Steven Allen's avatar
Steven Allen committed
225 226 227 228
		if delay >= 0 {
			timer.Reset(delay)
		} else {
			timer.Reset(0)
Karthik Bala's avatar
Karthik Bala committed
229 230
		}

Steven Allen's avatar
Steven Allen committed
231 232 233
		if buffered >= bufsize {
			select {
			case <-timer.C:
234
			case <-s.reset:
235 236 237 238
				select {
				case s.reset <- struct{}{}:
				default:
				}
Hlib's avatar
Hlib committed
239
				return mux.ErrReset
240 241 242
			}
			if err := drainBuf(); err != nil {
				return err
Steven Allen's avatar
Steven Allen committed
243 244 245 246
			}
			// write this message.
			_, err := s.write.Write(o.msg)
			if err != nil {
247
				return err
Steven Allen's avatar
Steven Allen committed
248 249 250
			}
		} else {
			buf.Write(o.msg)
Karthik Bala's avatar
Karthik Bala committed
251
		}
252
		return nil
Karthik Bala's avatar
Karthik Bala committed
253 254 255
	}

	for {
256 257 258
		// Reset takes precedent.
		select {
		case <-s.reset:
Hlib's avatar
Hlib committed
259
			s.writeErr = mux.ErrReset
Steven Allen's avatar
Steven Allen committed
260 261 262
			return
		default:
		}
Karthik Bala's avatar
Karthik Bala committed
263

Steven Allen's avatar
Steven Allen committed
264
		select {
265
		case <-s.reset:
Hlib's avatar
Hlib committed
266
			s.writeErr = mux.ErrReset
267 268
			return
		case <-s.close:
269
			if err := drainBuf(); err != nil {
270
				s.cancelWrite(err)
271 272 273 274 275 276
				return
			}
			s.writeErr = s.write.Close()
			if s.writeErr == nil {
				s.writeErr = ErrClosed
			}
277 278
			return
		case o := <-s.toDeliver:
279
			if err := deliverOrWait(o); err != nil {
280
				s.cancelWrite(err)
281 282
				return
			}
Steven Allen's avatar
Steven Allen committed
283
		case <-timer.C: // ok, due to write it out.
284
			if err := drainBuf(); err != nil {
285
				s.cancelWrite(err)
286 287
				return
			}
Karthik Bala's avatar
Karthik Bala committed
288 289 290
		}
	}
}
291

292
func (s *stream) cancelWrite(err error) {
293 294 295
	s.write.CloseWithError(err)
	s.writeErr = err
}