mock_conn.go 3.91 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4
package mocknet

import (
	"container/list"
5
	"context"
6
	"strconv"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"sync"
8
	"sync/atomic"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9

Jeromy's avatar
Jeromy committed
10
	process "github.com/jbenet/goprocess"
11 12
	ma "gitlab.dms3.io/mf/go-multiaddr"
	manet "gitlab.dms3.io/mf/go-multiaddr/net"
tavit ohanian's avatar
tavit ohanian committed
13 14 15
	ic "gitlab.dms3.io/p2p/go-p2p-core/crypto"
	"gitlab.dms3.io/p2p/go-p2p-core/network"
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18 19
var connCounter int64

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22 23
// conn represents one side's perspective of a
// live connection between two peers.
// it goes over a particular link.
type conn struct {
24 25
	notifLk sync.Mutex

26 27
	id int64

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28 29 30 31 32 33 34 35 36 37 38 39 40
	local  peer.ID
	remote peer.ID

	localAddr  ma.Multiaddr
	remoteAddr ma.Multiaddr

	localPrivKey ic.PrivKey
	remotePubKey ic.PubKey

	net     *peernet
	link    *link
	rconn   *conn // counterpart
	streams list.List
41
	stat    network.Stat
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42

43 44
	pairProc, connProc process.Process

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45 46 47
	sync.RWMutex
}

48
func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn {
49
	c := &conn{net: ln, link: l, pairProc: p}
Jeromy's avatar
Jeromy committed
50 51
	c.local = ln.peer
	c.remote = rn.peer
52
	c.stat = network.Stat{Direction: dir}
53
	c.id = atomic.AddInt64(&connCounter, 1)
Jeromy's avatar
Jeromy committed
54 55

	c.localAddr = ln.ps.Addrs(ln.peer)[0]
56 57 58 59 60 61 62 63 64
	for _, a := range rn.ps.Addrs(rn.peer) {
		if !manet.IsIPUnspecified(a) {
			c.remoteAddr = a
			break
		}
	}
	if c.remoteAddr == nil {
		c.remoteAddr = rn.ps.Addrs(rn.peer)[0]
	}
Jeromy's avatar
Jeromy committed
65 66 67

	c.localPrivKey = ln.ps.PrivKey(ln.peer)
	c.remotePubKey = rn.ps.PubKey(rn.peer)
68
	c.connProc = process.WithParent(c.pairProc)
Jeromy's avatar
Jeromy committed
69 70 71
	return c
}

72 73 74 75
func (c *conn) ID() string {
	return strconv.FormatInt(c.id, 10)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
func (c *conn) Close() error {
77 78 79 80 81
	return c.pairProc.Close()
}

func (c *conn) setup() {
	c.connProc.SetTeardown(c.teardown)
Jeromy's avatar
Jeromy committed
82 83 84
}

func (c *conn) teardown() error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85
	for _, s := range c.allStreams() {
Steven Allen's avatar
Steven Allen committed
86
		s.Reset()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87 88
	}
	c.net.removeConn(c)
89

90 91 92 93 94 95 96
	go func() {
		c.notifLk.Lock()
		defer c.notifLk.Unlock()
		c.net.notifyAll(func(n network.Notifiee) {
			n.Disconnected(c.net, c)
		})
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97 98 99 100 101 102 103
	return nil
}

func (c *conn) addStream(s *stream) {
	c.Lock()
	s.conn = c
	c.streams.PushBack(s)
104 105
	s.notifLk.Lock()
	defer s.notifLk.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106
	c.Unlock()
107 108 109
	c.net.notifyAll(func(n network.Notifiee) {
		n.OpenedStream(c.net, s)
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110 111 112 113 114 115 116
}

func (c *conn) removeStream(s *stream) {
	c.Lock()
	for e := c.streams.Front(); e != nil; e = e.Next() {
		if s == e.Value {
			c.streams.Remove(e)
117
			break
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
		}
	}
120 121
	c.Unlock()

122 123 124 125 126 127 128
	go func() {
		s.notifLk.Lock()
		defer s.notifLk.Unlock()
		s.conn.net.notifyAll(func(n network.Notifiee) {
			n.ClosedStream(s.conn.net, s)
		})
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130
}

131
func (c *conn) allStreams() []network.Stream {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133 134
	c.RLock()
	defer c.RUnlock()

135
	strs := make([]network.Stream, 0, c.streams.Len())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136 137 138 139 140 141 142 143 144 145 146 147 148
	for e := c.streams.Front(); e != nil; e = e.Next() {
		s := e.Value.(*stream)
		strs = append(strs, s)
	}
	return strs
}

func (c *conn) remoteOpenedStream(s *stream) {
	c.addStream(s)
	c.net.handleNewStream(s)
}

func (c *conn) openStream() *stream {
149 150
	sl, sr := newStreamPair()
	go c.rconn.remoteOpenedStream(sr)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151 152 153 154
	c.addStream(sl)
	return sl
}

155
func (c *conn) NewStream(context.Context) (network.Stream, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157 158 159 160 161
	log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, c.remote)

	s := c.openStream()
	return s, nil
}

162
func (c *conn) GetStreams() []network.Stream {
iulianpascalau's avatar
iulianpascalau committed
163
	return c.allStreams()
164 165
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
// LocalMultiaddr is the Multiaddr on this side
func (c *conn) LocalMultiaddr() ma.Multiaddr {
	return c.localAddr
}

// LocalPeer is the Peer on our side of the connection
func (c *conn) LocalPeer() peer.ID {
	return c.local
}

// LocalPrivateKey is the private key of the peer on our side.
func (c *conn) LocalPrivateKey() ic.PrivKey {
	return c.localPrivKey
}

// RemoteMultiaddr is the Multiaddr on the remote side
func (c *conn) RemoteMultiaddr() ma.Multiaddr {
	return c.remoteAddr
}

// RemotePeer is the Peer on the remote side
func (c *conn) RemotePeer() peer.ID {
	return c.remote
}

// RemotePublicKey is the private key of the peer on our side.
func (c *conn) RemotePublicKey() ic.PubKey {
	return c.remotePubKey
}
195 196

// Stat returns metadata about the connection
197
func (c *conn) Stat() network.Stat {
198 199
	return c.stat
}