Commit ac42cbe9 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

mock2

parent 31a62aff
// Package mocknet provides a mock net.Network to test with.
//
// - a Mocknet has many inet.Networks
// - a Mocknet has many Links
// - a Link joins two inet.Networks
// - inet.Conns and inet.Streams are created by inet.Networks
package mocknet
import (
"time"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
)
type Mocknet interface {
GenPeer() (inet.Network, error)
AddPeer(peer.ID) (inet.Network, error)
// retrieve things
Peer(peer.ID) peer.Peer
Peers() []peer.Peer
Net(peer.ID) inet.Network
Nets() []inet.Network
LinksBetweenPeers(a, b peer.Peer) []Link
LinksBetweenNets(a, b inet.Network) []Link
// Links are the **ability to connect**.
// think of Links as the physical medium.
// For p1 and p2 to connect, a link must exist between them.
// (this makes it possible to test dial failures, and
// things like relaying traffic)
LinkPeers(peer.Peer, peer.Peer) (Link, error)
LinkNets(inet.Network, inet.Network) (Link, error)
Unlink(Link) error
UnlinkPeers(peer.Peer, peer.Peer) error
UnlinkNets(inet.Network, inet.Network) error
// LinkDefaults are the default options that govern links
// if they do not have thier own option set.
SetLinkDefaults(LinkOptions)
LinkDefaults() LinkOptions
// Connections are the usual. Connecting means Dialing.
// For convenience, if no link exists, Connect will add one.
// (this is because Connect is called manually by tests).
ConnectPeers(peer.Peer, peer.Peer) error
ConnectNets(inet.Network, inet.Network) error
DisconnectPeers(peer.Peer, peer.Peer) error
DisconnectNets(inet.Network, inet.Network) error
}
type LinkOptions struct {
Latency time.Duration
Bandwidth int // in bytes-per-second
// we can make these values distributions down the road.
}
type Link interface {
Networks() []inet.Network
Peers() []peer.Peer
SetOptions(LinkOptions)
Options() LinkOptions
// Metrics() Metrics
}
package mocknet
import (
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
var log = eventlog.Logger("mocknet")
// WithNPeers constructs a Mocknet with N peers.
func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
m := New(ctx)
for i := 0; i < n; i++ {
if _, err := m.GenPeer(); err != nil {
return nil, err
}
}
return m, nil
}
// FullMeshLinked constructs a Mocknet with full mesh of Links.
// This means that all the peers **can** connect to each other
// (not that they already are connected. you can use m.ConnectAll())
func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) {
m, err := WithNPeers(ctx, n)
if err != nil {
return nil, err
}
nets := m.Nets()
for _, n1 := range nets {
for _, n2 := range nets {
// yes, even self.
if _, err := m.LinkNets(n1, n2); err != nil {
return nil, err
}
}
}
return m, nil
}
// FullMeshConnected constructs a Mocknet with full mesh of Connections.
// This means that all the peers have dialed and are ready to talk to
// each other.
func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) {
m, err := FullMeshLinked(ctx, n)
if err != nil {
return nil, err
}
nets := m.Nets()
for _, n1 := range nets {
for _, n2 := range nets {
if n1 == n2 {
continue
}
if err := m.ConnectNets(n1, n2); err != nil {
return nil, err
}
}
}
return m, nil
}
package mocknet
import (
"container/list"
"sync"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// conn represents one side's perspective of a
// live connection between two peers.
// it goes over a particular link.
type conn struct {
local peer.Peer
remote peer.Peer
net *peernet
link *link
rconn *conn // counterpart
streams list.List
sync.RWMutex
}
func (c *conn) Close() error {
for _, s := range c.allStreams() {
s.Close()
}
c.net.removeConn(c)
return nil
}
func (c *conn) addStream(s *stream) {
c.Lock()
s.conn = c
c.streams.PushBack(s)
c.Unlock()
}
func (c *conn) removeStream(s *stream) {
c.Lock()
defer c.Unlock()
for e := c.streams.Front(); e != nil; e = e.Next() {
if s == e.Value {
c.streams.Remove(e)
return
}
}
}
func (c *conn) allStreams() []inet.Stream {
c.RLock()
defer c.RUnlock()
strs := make([]inet.Stream, 0, c.streams.Len())
for e := c.streams.Front(); e != nil; e = e.Next() {
s := e.Value.(*stream)
strs = append(strs, s)
}
return strs
}
func (c *conn) remoteOpenedStream(s *stream) {
c.addStream(s)
c.net.handleNewStream(s)
}
func (c *conn) openStream() *stream {
sl, sr := c.link.newStreamPair()
c.addStream(sl)
c.rconn.remoteOpenedStream(sr)
return sl
}
func (c *conn) NewStreamWithProtocol(pr inet.ProtocolID, p peer.Peer) (inet.Stream, error) {
log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, p)
s := c.openStream()
if err := inet.WriteProtocolHeader(pr, s); err != nil {
s.Close()
return nil, err
}
return s, nil
}
// LocalMultiaddr is the Multiaddr on this side
func (c *conn) LocalMultiaddr() ma.Multiaddr {
return nil
}
// LocalPeer is the Peer on our side of the connection
func (c *conn) LocalPeer() peer.Peer {
return c.local
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *conn) RemoteMultiaddr() ma.Multiaddr {
return nil
}
// RemotePeer is the Peer on the remote side
func (c *conn) RemotePeer() peer.Peer {
return c.remote
}
package mocknet
import (
"fmt"
"io"
"sync"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
)
// link implements mocknet.Link
// and, for simplicity, inet.Conn
type link struct {
mock *mocknet
nets []*peernet
opts LinkOptions
sync.RWMutex
}
func newLink(mn *mocknet) *link {
return &link{mock: mn, opts: mn.linkDefaults}
}
func (l *link) newConnPair() (*conn, *conn) {
l.RLock()
defer l.RUnlock()
mkconn := func(n *peernet, rid peer.ID) *conn {
c := &conn{net: n, link: l}
c.local = n.peer
r, err := n.ps.FindOrCreate(rid)
if err != nil {
panic(fmt.Errorf("error creating peer: %s", err))
}
c.remote = r
return c
}
c1 := mkconn(l.nets[0], l.nets[1].peer.ID())
c2 := mkconn(l.nets[1], l.nets[0].peer.ID())
c1.rconn = c2
c2.rconn = c1
return c1, c2
}
func (l *link) newStreamPair() (*stream, *stream) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
s1 := &stream{Reader: r1, Writer: w2}
s2 := &stream{Reader: r2, Writer: w1}
return s1, s2
}
func (l *link) Networks() []inet.Network {
l.RLock()
defer l.RUnlock()
cp := make([]inet.Network, len(l.nets))
for i, n := range l.nets {
cp[i] = n
}
return cp
}
func (l *link) Peers() []peer.Peer {
l.RLock()
defer l.RUnlock()
cp := make([]peer.Peer, len(l.nets))
for i, n := range l.nets {
cp[i] = n.peer
}
return cp
}
func (l *link) SetOptions(o LinkOptions) {
l.opts = o
}
func (l *link) Options() LinkOptions {
return l.opts
}
package mocknet
import (
"fmt"
"sync"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
)
type peerID string
// mocknet implements mocknet.Mocknet
type mocknet struct {
// must map on peer.ID (instead of peer.Peer) because
// each inet.Network has different peerstore
nets map[peerID]*peernet
// links make it possible to connect two peers.
// think of links as the physical medium.
// usually only one, but there could be multiple
// **links are shared between peers**
links map[peerID]map[peerID]map[*link]struct{}
linkDefaults LinkOptions
cg ctxgroup.ContextGroup // for Context closing
sync.RWMutex
}
func New(ctx context.Context) Mocknet {
return &mocknet{
nets: map[peerID]*peernet{},
links: map[peerID]map[peerID]map[*link]struct{}{},
}
}
func (mn *mocknet) GenPeer() (inet.Network, error) {
p, err := testutil.PeerWithNewKeys()
if err != nil {
return nil, err
}
n, err := mn.AddPeer(p.ID())
if err != nil {
return nil, err
}
// copy over keys
if err := n.LocalPeer().Update(p); err != nil {
return nil, err
}
return n, nil
}
func (mn *mocknet) AddPeer(p peer.ID) (inet.Network, error) {
n, err := newPeernet(mn.cg.Context(), mn, p)
if err != nil {
return nil, err
}
mn.cg.AddChildGroup(n.cg)
mn.Lock()
mn.nets[pid(n.peer)] = n
mn.Unlock()
return n, nil
}
func (mn *mocknet) Peer(pid peer.ID) peer.Peer {
mn.RLock()
defer mn.RUnlock()
for _, n := range mn.nets {
if n.peer.ID().Equal(pid) {
return n.peer
}
}
return nil
}
func (mn *mocknet) Peers() []peer.Peer {
mn.RLock()
defer mn.RUnlock()
cp := make([]peer.Peer, 0, len(mn.nets))
for _, n := range mn.nets {
cp = append(cp, n.peer)
}
return cp
}
func (mn *mocknet) Net(pid peer.ID) inet.Network {
mn.RLock()
defer mn.RUnlock()
for _, n := range mn.nets {
if n.peer.ID().Equal(pid) {
return n
}
}
return nil
}
func (mn *mocknet) Nets() []inet.Network {
mn.RLock()
defer mn.RUnlock()
cp := make([]inet.Network, 0, len(mn.nets))
for _, n := range mn.nets {
cp = append(cp, n)
}
return cp
}
func (mn *mocknet) LinkAll() error {
nets := mn.Nets()
for _, n1 := range nets {
for _, n2 := range nets {
if _, err := mn.LinkNets(n1, n2); err != nil {
return err
}
}
}
return nil
}
func (mn *mocknet) LinkPeers(p1, p2 peer.Peer) (Link, error) {
mn.RLock()
n1 := mn.nets[pid(p1)]
n2 := mn.nets[pid(p2)]
mn.RUnlock()
if n1 == nil {
return nil, fmt.Errorf("network for p1 not in mocknet")
}
if n2 == nil {
return nil, fmt.Errorf("network for p2 not in mocknet")
}
return mn.LinkNets(n1, n2)
}
func (mn *mocknet) validate(n inet.Network) (*peernet, error) {
// WARNING: assumes locks acquired
nr, ok := n.(*peernet)
if !ok {
return nil, fmt.Errorf("Network not supported (use mock package nets only)")
}
if _, found := mn.nets[pid(nr.peer)]; !found {
return nil, fmt.Errorf("Network not on mocknet. is it from another mocknet?")
}
return nr, nil
}
func (mn *mocknet) LinkNets(n1, n2 inet.Network) (Link, error) {
mn.Lock()
defer mn.Unlock()
if _, err := mn.validate(n1); err != nil {
return nil, err
}
if _, err := mn.validate(n2); err != nil {
return nil, err
}
l := newLink(mn)
mn.addLink(l)
return l, nil
}
func (mn *mocknet) Unlink(l2 Link) error {
l, ok := l2.(*link)
if !ok {
return fmt.Errorf("only links from mocknet are supported")
}
mn.removeLink(l)
return nil
}
func (mn *mocknet) UnlinkPeers(p1, p2 peer.Peer) error {
ls := mn.LinksBetweenPeers(p1, p2)
if ls == nil {
return fmt.Errorf("no link between p1 and p2")
}
for _, l := range ls {
if err := mn.Unlink(l); err != nil {
return err
}
}
return nil
}
func (mn *mocknet) UnlinkNets(n1, n2 inet.Network) error {
return mn.DisconnectPeers(n1.LocalPeer(), n2.LocalPeer())
}
func (mn *mocknet) addLink(l *link) {
mn.Lock()
defer mn.Unlock()
n1, n2 := l.nets[0], l.nets[1]
mn.links[pid(n1.peer)][pid(n2.peer)][l] = struct{}{}
mn.links[pid(n2.peer)][pid(n1.peer)][l] = struct{}{}
}
func (mn *mocknet) removeLink(l *link) {
mn.Lock()
defer mn.Unlock()
n1, n2 := l.nets[0], l.nets[1]
delete(mn.links[pid(n1.peer)][pid(n2.peer)], l)
delete(mn.links[pid(n2.peer)][pid(n1.peer)], l)
}
func (mn *mocknet) ConnectAll(p1, p2 peer.Peer) (Link, error) {
panic("nyi")
}
func (mn *mocknet) ConnectPeers(a, b peer.Peer) error {
panic("nyi")
}
func (mn *mocknet) ConnectNets(inet.Network, inet.Network) error {
panic("nyi")
}
func (mn *mocknet) DisconnectPeers(p1, p2 peer.Peer) error {
panic("nyi")
}
func (mn *mocknet) DisconnectNets(n1, n2 inet.Network) error {
return mn.DisconnectPeers(n1.LocalPeer(), n2.LocalPeer())
}
func (mn *mocknet) LinksBetweenPeers(p1, p2 peer.Peer) []Link {
mn.RLock()
defer mn.RUnlock()
ls1, found := mn.links[pid(p1)]
if !found {
return nil
}
ls2, found := ls1[pid(p2)]
if !found {
return nil
}
cp := make([]Link, 0, len(ls2))
for l := range ls2 {
cp = append(cp, l)
}
return cp
}
func (mn *mocknet) LinksBetweenNets(n1, n2 inet.Network) []Link {
return mn.LinksBetweenPeers(n1.LocalPeer(), n2.LocalPeer())
}
func (mn *mocknet) SetLinkDefaults(o LinkOptions) {
mn.Lock()
mn.linkDefaults = o
mn.Unlock()
}
func (mn *mocknet) LinkDefaults() LinkOptions {
mn.RLock()
defer mn.RUnlock()
return mn.linkDefaults
}
package mocknet
import (
"container/list"
"fmt"
"math/rand"
"sync"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// peernet implements inet.Network
type peernet struct {
mocknet *mocknet // parent
peer peer.Peer
ps peer.Peerstore
// conns are actual live connections between peers.
// many conns could run over each link.
// **conns are NOT shared between peers**
connsByPeer map[peerID]list.List
connsByLink map[*link]list.List
// needed to implement inet.Network
mux inet.Mux
cg ctxgroup.ContextGroup
sync.RWMutex
}
// newPeernet constructs a new peernet
func newPeernet(ctx context.Context, m *mocknet, id peer.ID) (*peernet, error) {
// create our own entirely, so that peers dont get shuffled across
// network divides. dont share peers.
ps := peer.NewPeerstore()
p, err := ps.FindOrCreate(id)
if err != nil {
return nil, err
}
n := &peernet{
mocknet: m,
peer: p,
ps: ps,
mux: inet.Mux{Handlers: inet.StreamHandlerMap{}},
cg: ctxgroup.WithContext(ctx),
connsByPeer: map[peerID]list.List{},
connsByLink: map[*link]list.List{},
}
n.cg.SetTeardown(n.teardown)
return n, nil
}
func (pn *peernet) teardown() error {
// close the connections
for _, c := range pn.allConns() {
c.Close()
}
return nil
}
// allConns returns all the connections between this peer and others
func (pn *peernet) allConns() []*conn {
pn.RLock()
var cs []*conn
for _, csl := range pn.connsByPeer {
for e := csl.Front(); e != nil; e = e.Next() {
c := e.Value.(*conn)
cs = append(cs, c)
}
}
pn.RUnlock()
return cs
}
// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.cg.Close()
}
func (pn *peernet) String() string {
return fmt.Sprintf("<mock.peernet %s - %d conns>", pn.peer, len(pn.allConns()))
}
// handleNewStream is an internal function to trigger the muxer handler
func (pn *peernet) handleNewStream(s inet.Stream) {
go pn.mux.Handle(s)
}
// DialPeer attempts to establish a connection to a given peer.
// Respects the context.
func (pn *peernet) DialPeer(ctx context.Context, p peer.Peer) error {
return pn.connect(p)
}
func (pn *peernet) connect(p peer.Peer) error {
// cannot trust the peer we get. typical for tests to give us
// a peer from some other peerstore...
p, err := pn.ps.Add(p)
if err != nil {
return err
}
// first, check if we already have live connections
pn.RLock()
cs, found := pn.connsByPeer[pid(p)]
ncs := cs.Len()
pn.RUnlock()
if found && ncs > 0 {
return nil
}
// ok, must create a new connection. we need a link
links := pn.mocknet.LinksBetweenPeers(pn.peer, p)
if len(links) < 1 {
return fmt.Errorf("cannot connect to peer %s", p)
}
// if many links found, how do we select? for now, randomly...
// this would be an interesting place to test logic that can measure
// links (network interfaces) and select properly
l := links[rand.Intn(len(links))]
// create a new connection with link
pn.openConn(p, l.(*link))
return nil
}
func (pn *peernet) openConn(r peer.Peer, l *link) *conn {
lc, rc := l.newConnPair()
pn.addConn(lc)
rc.net.remoteOpenedConn(rc)
return lc
}
func (pn *peernet) remoteOpenedConn(c *conn) {
pn.addConn(c)
}
// addConn constructs and adds a connection
// to given remote peer over given link
func (pn *peernet) addConn(c *conn) {
pn.Lock()
cs, found := pn.connsByPeer[pid(c.RemotePeer())]
if !found {
cs = list.List{}
pn.connsByPeer[pid(c.RemotePeer())] = cs
}
cs.PushBack(c)
cs, found = pn.connsByLink[c.link]
if !found {
cs = list.List{}
pn.connsByLink[c.link] = cs
}
cs.PushBack(c)
pn.Unlock()
}
// removeConn removes a given conn
func (pn *peernet) removeConn(c *conn) {
pn.Lock()
defer pn.Unlock()
cs, found := pn.connsByLink[c.link]
if !found {
panic("attempting to remove a conn that doesnt exist")
}
for e := cs.Front(); e != nil; e = e.Next() {
if c == e.Value {
cs.Remove(e)
break
}
}
cs, found = pn.connsByPeer[pid(c.remote)]
if !found {
panic("attempting to remove a conn that doesnt exist")
}
for e := cs.Front(); e != nil; e = e.Next() {
if c == e.Value {
cs.Remove(e)
break
}
}
}
// CtxGroup returns the network's ContextGroup
func (pn *peernet) CtxGroup() ctxgroup.ContextGroup {
return pn.cg
}
// LocalPeer the network's LocalPeer
func (pn *peernet) LocalPeer() peer.Peer {
return pn.peer
}
// Peers returns the connected peers
func (pn *peernet) Peers() []peer.Peer {
pn.RLock()
defer pn.RUnlock()
peers := make([]peer.Peer, 0, len(pn.connsByPeer))
for _, cs := range pn.connsByPeer {
if cs.Len() == 0 {
panic("found empty connection list. not removed properly...")
}
c := cs.Front().Value.(*conn)
peers = append(peers, c.remote)
}
return peers
}
// Conns returns all the connections of this peer
func (pn *peernet) Conns() []inet.Conn {
pn.RLock()
defer pn.RUnlock()
out := make([]inet.Conn, 0, len(pn.connsByPeer))
for _, cs := range pn.connsByPeer {
for e := cs.Front(); e != nil; e = e.Next() {
c := e.Value.(*conn)
out = append(out, c)
}
}
return out
}
func (pn *peernet) ConnsToPeer(p peer.Peer) []inet.Conn {
pn.RLock()
defer pn.RUnlock()
cs, found := pn.connsByPeer[pid(p)]
if !found {
return nil
}
if cs.Len() == 0 {
panic("found empty connection list. not removed properly...")
}
var cs2 []inet.Conn
for e := cs.Front(); e != nil; e = e.Next() {
c := e.Value.(*conn)
cs2 = append(cs2, c)
}
return cs2
}
// ClosePeer connections to peer
func (pn *peernet) ClosePeer(p peer.Peer) error {
pn.RLock()
cs, found := pn.connsByPeer[pid(p)]
pn.RUnlock()
if !found {
return nil
}
for e := cs.Front(); e != nil; e = e.Next() {
c := e.Value.(*conn)
pn.closeConn(c)
}
return nil
}
func (pn *peernet) closeConn(c *conn) {
pn.Lock()
defer pn.Unlock()
// remove it from connsByPeer
cs, found := pn.connsByPeer[pid(c.remote)]
if !found {
panic("attempted to close connection that doesnt exist! (peer)")
}
for e := cs.Front(); e != nil; e = e.Next() {
if c == e.Value.(*conn) {
cs.Remove(e)
}
}
if cs.Len() == 0 {
delete(pn.connsByPeer, pid(c.remote))
}
// remove it from connsByLink
cs, found = pn.connsByLink[c.link]
if !found {
panic("attempted to close connection that doesnt exist! (link)")
}
for e := cs.Front(); e != nil; e = e.Next() {
if c == e.Value.(*conn) {
cs.Remove(e)
}
}
if cs.Len() == 0 {
delete(pn.connsByLink, c.link)
}
}
// BandwidthTotals returns the total amount of bandwidth transferred
func (pn *peernet) BandwidthTotals() (in uint64, out uint64) {
// need to implement this. probably best to do it in swarm this time.
// need a "metrics" object
return 0, 0
}
// ListenAddresses returns a list of addresses at which this network listens.
func (pn *peernet) ListenAddresses() []ma.Multiaddr {
return []ma.Multiaddr{}
}
// InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (pn *peernet) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return []ma.Multiaddr{}, nil
}
// Connectedness returns a state signaling connection capabilities
// For now only returns Connecter || NotConnected. Expand into more later.
func (pn *peernet) Connectedness(p peer.Peer) inet.Connectedness {
pn.Lock()
defer pn.Unlock()
cs, found := pn.connsByPeer[pid(p)]
if found {
if cs.Len() == 0 {
panic("found empty connection list. not removed properly...")
}
return inet.Connected
}
return inet.NotConnected
}
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
// If ProtocolID is "", writes no header.
func (pn *peernet) NewStream(pr inet.ProtocolID, p peer.Peer) (inet.Stream, error) {
pn.Lock()
defer pn.Unlock()
cs, found := pn.connsByPeer[pid(p)]
if !found {
return nil, fmt.Errorf("no connection to peer")
}
// if many conns are found, how do we select? for now, randomly...
// this would be an interesting place to test logic that can measure
// links (network interfaces) and select properly
c := randomListElem(&cs).Value.(*conn)
return c.NewStreamWithProtocol(pr, p)
}
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (pn *peernet) SetHandler(p inet.ProtocolID, h inet.StreamHandler) {
pn.mux.SetHandler(p, h)
}
func pid(p peer.Peer) peerID {
return peerID(p.ID())
}
func randomListElem(l *list.List) *list.Element {
n := rand.Intn(l.Len())
for e := l.Front(); e != nil; e = e.Next() {
if n == 0 {
return e
}
n--
}
panic("unreachable")
}
package mocknet
import (
"io"
inet "github.com/jbenet/go-ipfs/net"
)
// stream implements inet.Stream
type stream struct {
io.Reader
io.Writer
conn *conn
}
func (s *stream) Close() error {
s.conn.removeStream(s)
if r, ok := (s.Reader).(io.Closer); ok {
r.Close()
}
if w, ok := (s.Writer).(io.Closer); ok {
return w.Close()
}
return nil
}
func (s *stream) Conn() inet.Conn {
return s.conn
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment