From 2b03664ae4cc95c81222014580fd94dce7f42b33 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet <juan@benet.ai> Date: Sun, 14 Sep 2014 01:24:43 -0700 Subject: [PATCH] net interface --- net/interface.go | 33 +++++++++++++ net/net.go | 106 +++++++++++++++++++++++++++++++++++++++++ net/net_test.go | 1 + net/service/service.go | 5 ++ net/swarm/swarm.go | 41 ++++------------ 5 files changed, 153 insertions(+), 33 deletions(-) create mode 100644 net/interface.go create mode 100644 net/net.go create mode 100644 net/net_test.go diff --git a/net/interface.go b/net/interface.go new file mode 100644 index 000000000..f5934a7e1 --- /dev/null +++ b/net/interface.go @@ -0,0 +1,33 @@ +package net + +import ( + msg "github.com/jbenet/go-ipfs/net/message" + mux "github.com/jbenet/go-ipfs/net/mux" + peer "github.com/jbenet/go-ipfs/peer" +) + +// Network is the interface IPFS uses for connecting to the world. +type Network interface { + + // Listen handles incoming connections on given Multiaddr. + // Listen(*ma.Muliaddr) error + // TODO: for now, only listen on addrs in local peer when initializing. + + // DialPeer attempts to establish a connection to a given peer + DialPeer(*peer.Peer) error + + // ClosePeer connection to peer + ClosePeer(*peer.Peer) error + + // IsConnected returns whether a connection to given peer exists. + IsConnected(*peer.Peer) (bool, error) + + // GetProtocols returns the protocols registered in the network. + GetProtocols() *mux.ProtocolMap + + // SendMessage sends given Message out + SendMessage(*msg.Message) error + + // Close terminates all network operation + Close() error +} diff --git a/net/net.go b/net/net.go new file mode 100644 index 000000000..2895c38f9 --- /dev/null +++ b/net/net.go @@ -0,0 +1,106 @@ +package net + +import ( + "errors" + + msg "github.com/jbenet/go-ipfs/net/message" + mux "github.com/jbenet/go-ipfs/net/mux" + swarm "github.com/jbenet/go-ipfs/net/swarm" + peer "github.com/jbenet/go-ipfs/peer" + + context "code.google.com/p/go.net/context" +) + +// IpfsNetwork implements the Network interface, +type IpfsNetwork struct { + + // local peer + local *peer.Peer + + // protocol multiplexing + muxer *mux.Muxer + + // peer connection multiplexing + swarm *swarm.Swarm + + // network context + ctx context.Context + cancel context.CancelFunc +} + +// NewIpfsNetwork is the structure that implements the network interface +func NewIpfsNetwork(ctx context.Context, local *peer.Peer, + pmap *mux.ProtocolMap) (*IpfsNetwork, error) { + + ctx, cancel := context.WithCancel(ctx) + + in := &IpfsNetwork{ + local: local, + muxer: &mux.Muxer{Protocols: *pmap}, + ctx: ctx, + cancel: cancel, + } + + err := in.muxer.Start(ctx) + if err != nil { + cancel() + return nil, err + } + + in.swarm, err = swarm.NewSwarm(ctx, local) + if err != nil { + cancel() + return nil, err + } + + return in, nil +} + +// Listen handles incoming connections on given Multiaddr. +// func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {} + +// DialPeer attempts to establish a connection to a given peer +func (n *IpfsNetwork) DialPeer(p *peer.Peer) error { + _, err := n.swarm.Dial(p) + return err +} + +// ClosePeer connection to peer +func (n *IpfsNetwork) ClosePeer(p *peer.Peer) error { + return n.swarm.CloseConnection(p) +} + +// IsConnected returns whether a connection to given peer exists. +func (n *IpfsNetwork) IsConnected(p *peer.Peer) (bool, error) { + return n.swarm.GetConnection(p.ID) != nil, nil +} + +// GetProtocols returns the protocols registered in the network. +func (n *IpfsNetwork) GetProtocols() *mux.ProtocolMap { + // copy over because this map should be read only. + pmap := mux.ProtocolMap{} + for id, proto := range n.muxer.Protocols { + pmap[id] = proto + } + return &pmap +} + +// SendMessage sends given Message out +func (n *IpfsNetwork) SendMessage(m *msg.Message) error { + n.swarm.Outgoing <- m + return nil +} + +// Close terminates all network operation +func (n *IpfsNetwork) Close() error { + if n.cancel == nil { + return errors.New("Network already closed.") + } + + n.swarm.Close() + n.muxer.Stop() + + n.cancel() + n.cancel = nil + return nil +} diff --git a/net/net_test.go b/net/net_test.go new file mode 100644 index 000000000..9d9f1a11e --- /dev/null +++ b/net/net_test.go @@ -0,0 +1 @@ +package net diff --git a/net/service/service.go b/net/service/service.go index 586b3ce81..d67355526 100644 --- a/net/service/service.go +++ b/net/service/service.go @@ -68,6 +68,11 @@ func (s *Service) Stop() { s.cancel = context.CancelFunc(nil) } +// GetPipe implements the mux.Protocol interface +func (s *Service) GetPipe() *msg.Pipe { + return s.Pipe +} + // SendMessage sends a message out func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error { diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 27c1a8770..7ef4ce234 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -110,15 +110,8 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) { return nil, errors.New("Attempted connection to self!") } - k := peer.Key() - // check if we already have an open connection first - s.connsLock.RLock() - c, found := s.conns[k] - s.connsLock.RUnlock() - if found { - return c, nil - } + c := s.GetConnection(peer.ID) // open connection to peer c, err := conn.Dial("tcp", peer) @@ -158,40 +151,22 @@ func (s *Swarm) DialAddr(addr *ma.Multiaddr) (*conn.Conn, error) { return c, err } -// GetPeer returns the peer in the swarm with given key id. -func (s *Swarm) GetPeer(key u.Key) *peer.Peer { +// GetConnection returns the connection in the swarm to given peer.ID +func (s *Swarm) GetConnection(pid peer.ID) *conn.Conn { s.connsLock.RLock() - conn, found := s.conns[key] + c, found := s.conns[u.Key(pid)] s.connsLock.RUnlock() if !found { return nil } - return conn.Peer -} - -// GetConnection will check if we are already connected to the peer in question -// and only open a new connection if we arent already -func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) { - p := &peer.Peer{ - ID: id, - Addresses: []*ma.Multiaddr{addr}, - } - - c, err := s.Dial(p) - if err != nil { - return nil, err - } - - return c.Peer, nil + return c } // CloseConnection removes a given peer from swarm + closes the connection func (s *Swarm) CloseConnection(p *peer.Peer) error { - s.connsLock.RLock() - conn, found := s.conns[u.Key(p.ID)] - s.connsLock.RUnlock() - if !found { + c := s.GetConnection(p.ID) + if c == nil { return u.ErrNotFound } @@ -199,7 +174,7 @@ func (s *Swarm) CloseConnection(p *peer.Peer) error { delete(s.conns, u.Key(p.ID)) s.connsLock.Unlock() - return conn.Close() + return c.Close() } func (s *Swarm) Error(e error) { -- GitLab