Commit 2b03664a authored by Juan Batiz-Benet's avatar Juan Batiz-Benet Committed by Brian Tiger Chow

net interface

parent d6e8e55f
package net
import (
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
)
// Network is the interface IPFS uses for connecting to the world.
type Network interface {
// Listen handles incoming connections on given Multiaddr.
// Listen(*ma.Muliaddr) error
// TODO: for now, only listen on addrs in local peer when initializing.
// DialPeer attempts to establish a connection to a given peer
DialPeer(*peer.Peer) error
// ClosePeer connection to peer
ClosePeer(*peer.Peer) error
// IsConnected returns whether a connection to given peer exists.
IsConnected(*peer.Peer) (bool, error)
// GetProtocols returns the protocols registered in the network.
GetProtocols() *mux.ProtocolMap
// SendMessage sends given Message out
SendMessage(*msg.Message) error
// Close terminates all network operation
Close() error
}
package net
import (
"errors"
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
swarm "github.com/jbenet/go-ipfs/net/swarm"
peer "github.com/jbenet/go-ipfs/peer"
context "code.google.com/p/go.net/context"
)
// IpfsNetwork implements the Network interface,
type IpfsNetwork struct {
// local peer
local *peer.Peer
// protocol multiplexing
muxer *mux.Muxer
// peer connection multiplexing
swarm *swarm.Swarm
// network context
ctx context.Context
cancel context.CancelFunc
}
// NewIpfsNetwork is the structure that implements the network interface
func NewIpfsNetwork(ctx context.Context, local *peer.Peer,
pmap *mux.ProtocolMap) (*IpfsNetwork, error) {
ctx, cancel := context.WithCancel(ctx)
in := &IpfsNetwork{
local: local,
muxer: &mux.Muxer{Protocols: *pmap},
ctx: ctx,
cancel: cancel,
}
err := in.muxer.Start(ctx)
if err != nil {
cancel()
return nil, err
}
in.swarm, err = swarm.NewSwarm(ctx, local)
if err != nil {
cancel()
return nil, err
}
return in, nil
}
// Listen handles incoming connections on given Multiaddr.
// func (n *IpfsNetwork) Listen(*ma.Muliaddr) error {}
// DialPeer attempts to establish a connection to a given peer
func (n *IpfsNetwork) DialPeer(p *peer.Peer) error {
_, err := n.swarm.Dial(p)
return err
}
// ClosePeer connection to peer
func (n *IpfsNetwork) ClosePeer(p *peer.Peer) error {
return n.swarm.CloseConnection(p)
}
// IsConnected returns whether a connection to given peer exists.
func (n *IpfsNetwork) IsConnected(p *peer.Peer) (bool, error) {
return n.swarm.GetConnection(p.ID) != nil, nil
}
// GetProtocols returns the protocols registered in the network.
func (n *IpfsNetwork) GetProtocols() *mux.ProtocolMap {
// copy over because this map should be read only.
pmap := mux.ProtocolMap{}
for id, proto := range n.muxer.Protocols {
pmap[id] = proto
}
return &pmap
}
// SendMessage sends given Message out
func (n *IpfsNetwork) SendMessage(m *msg.Message) error {
n.swarm.Outgoing <- m
return nil
}
// Close terminates all network operation
func (n *IpfsNetwork) Close() error {
if n.cancel == nil {
return errors.New("Network already closed.")
}
n.swarm.Close()
n.muxer.Stop()
n.cancel()
n.cancel = nil
return nil
}
......@@ -68,6 +68,11 @@ func (s *Service) Stop() {
s.cancel = context.CancelFunc(nil)
}
// GetPipe implements the mux.Protocol interface
func (s *Service) GetPipe() *msg.Pipe {
return s.Pipe
}
// SendMessage sends a message out
func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error {
......
......@@ -110,15 +110,8 @@ func (s *Swarm) Dial(peer *peer.Peer) (*conn.Conn, error) {
return nil, errors.New("Attempted connection to self!")
}
k := peer.Key()
// check if we already have an open connection first
s.connsLock.RLock()
c, found := s.conns[k]
s.connsLock.RUnlock()
if found {
return c, nil
}
c := s.GetConnection(peer.ID)
// open connection to peer
c, err := conn.Dial("tcp", peer)
......@@ -158,40 +151,22 @@ func (s *Swarm) DialAddr(addr *ma.Multiaddr) (*conn.Conn, error) {
return c, err
}
// GetPeer returns the peer in the swarm with given key id.
func (s *Swarm) GetPeer(key u.Key) *peer.Peer {
// GetConnection returns the connection in the swarm to given peer.ID
func (s *Swarm) GetConnection(pid peer.ID) *conn.Conn {
s.connsLock.RLock()
conn, found := s.conns[key]
c, found := s.conns[u.Key(pid)]
s.connsLock.RUnlock()
if !found {
return nil
}
return conn.Peer
}
// GetConnection will check if we are already connected to the peer in question
// and only open a new connection if we arent already
func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) {
p := &peer.Peer{
ID: id,
Addresses: []*ma.Multiaddr{addr},
}
c, err := s.Dial(p)
if err != nil {
return nil, err
}
return c.Peer, nil
return c
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p *peer.Peer) error {
s.connsLock.RLock()
conn, found := s.conns[u.Key(p.ID)]
s.connsLock.RUnlock()
if !found {
c := s.GetConnection(p.ID)
if c == nil {
return u.ErrNotFound
}
......@@ -199,7 +174,7 @@ func (s *Swarm) CloseConnection(p *peer.Peer) error {
delete(s.conns, u.Key(p.ID))
s.connsLock.Unlock()
return conn.Close()
return c.Close()
}
func (s *Swarm) Error(e error) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment