Commit 453a6670 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet Committed by Brian Tiger Chow

moved stuff

parent 0ac4a2ba
......@@ -7,10 +7,72 @@ import (
ident "github.com/jbenet/go-ipfs/identify"
conn "github.com/jbenet/go-ipfs/net/conn"
msg "github.com/jbenet/go-ipfs/net/message"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-multiaddr"
)
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen() error {
hasErr := false
retErr := &ListenErr{
Errors: make([]error, len(s.local.Addresses)),
}
// listen on every address
for i, addr := range s.local.Addresses {
err := s.connListen(addr)
if err != nil {
hasErr = true
retErr.Errors[i] = err
u.PErr("Failed to listen on: %s [%s]", addr, err)
}
}
if hasErr {
return retErr
}
return nil
}
// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
netstr, addr, err := maddr.DialArgs()
if err != nil {
return err
}
list, err := net.Listen(netstr, addr)
if err != nil {
return err
}
// NOTE: this may require a lock around it later. currently, only run on setup
s.listeners = append(s.listeners, list)
// Accept and handle new connections on this listener until it errors
go func() {
for {
nconn, err := list.Accept()
if err != nil {
e := fmt.Errorf("Failed to accept connection: %s - %s [%s]",
netstr, addr, err)
s.errChan <- e
// if cancel is nil, we're closed.
if s.cancel == nil {
return
}
} else {
go s.handleIncomingConn(nconn)
}
}
}()
return nil
}
// Handle getting ID from this peer, handshake, and adding it into the map
func (s *Swarm) handleIncomingConn(nconn net.Conn) {
......@@ -67,3 +129,63 @@ func (s *Swarm) connHandshake(c *conn.Conn) error {
// needs cleanup. needs context. use msg.Pipe.
return ident.Handshake(s.local, c.Peer, c.Incoming.MsgChan, c.Outgoing.MsgChan)
}
// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
for {
select {
case <-s.ctx.Done():
return // told to close.
case msg, ok := <-s.Outgoing:
if !ok {
return
}
s.connsLock.RLock()
conn, found := s.conns[msg.Peer.Key()]
s.connsLock.RUnlock()
if !found {
e := fmt.Errorf("Sent msg to peer without open conn: %v",
msg.Peer)
s.errChan <- e
continue
}
// queue it in the connection's buffer
conn.Outgoing.MsgChan <- msg.Data
}
}
}
// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(c *conn.Conn) {
for {
select {
case <-s.ctx.Done():
// close Conn.
c.Close()
goto out
case <-c.Closed:
goto out
case data, ok := <-c.Incoming.MsgChan:
if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", c.Peer.Key().Pretty())
s.errChan <- e
goto out
}
msg := &msg.Message{Peer: c.Peer, Data: data}
s.Incoming <- msg
}
}
out:
s.connsLock.Lock()
delete(s.conns, c.Peer.Key())
s.connsLock.Unlock()
}
......@@ -78,66 +78,6 @@ func NewSwarm(ctx context.Context, local *peer.Peer) (*Swarm, error) {
return s, s.listen()
}
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen() error {
hasErr := false
retErr := &ListenErr{
Errors: make([]error, len(s.local.Addresses)),
}
// listen on every address
for i, addr := range s.local.Addresses {
err := s.connListen(addr)
if err != nil {
hasErr = true
retErr.Errors[i] = err
u.PErr("Failed to listen on: %s [%s]", addr, err)
}
}
if hasErr {
return retErr
}
return nil
}
// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
netstr, addr, err := maddr.DialArgs()
if err != nil {
return err
}
list, err := net.Listen(netstr, addr)
if err != nil {
return err
}
// NOTE: this may require a lock around it later. currently, only run on setup
s.listeners = append(s.listeners, list)
// Accept and handle new connections on this listener until it errors
go func() {
for {
nconn, err := list.Accept()
if err != nil {
e := fmt.Errorf("Failed to accept connection: %s - %s [%s]",
netstr, addr, err)
s.errChan <- e
// if cancel is nil, we're closed.
if s.cancel == nil {
return
}
} else {
go s.handleIncomingConn(nconn)
}
}
}()
return nil
}
// Close stops a swarm.
func (s *Swarm) Close() error {
if s.cancel == nil {
......@@ -218,66 +158,6 @@ func (s *Swarm) DialAddr(addr *ma.Multiaddr) (*conn.Conn, error) {
return c, err
}
// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
for {
select {
case <-s.ctx.Done():
return // told to close.
case msg, ok := <-s.Outgoing:
if !ok {
return
}
s.connsLock.RLock()
conn, found := s.conns[msg.Peer.Key()]
s.connsLock.RUnlock()
if !found {
e := fmt.Errorf("Sent msg to peer without open conn: %v",
msg.Peer)
s.errChan <- e
continue
}
// queue it in the connection's buffer
conn.Outgoing.MsgChan <- msg.Data
}
}
}
// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(c *conn.Conn) {
for {
select {
case <-s.ctx.Done():
// close Conn.
c.Close()
goto out
case <-c.Closed:
goto out
case data, ok := <-c.Incoming.MsgChan:
if !ok {
e := fmt.Errorf("Error retrieving from conn: %v", c.Peer.Key().Pretty())
s.errChan <- e
goto out
}
msg := &msg.Message{Peer: c.Peer, Data: data}
s.Incoming <- msg
}
}
out:
s.connsLock.Lock()
delete(s.conns, c.Peer.Key())
s.connsLock.Unlock()
}
// GetPeer returns the peer in the swarm with given key id.
func (s *Swarm) GetPeer(key u.Key) *peer.Peer {
s.connsLock.RLock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment