package swarm

import (
	"errors"
	"fmt"
	"net"
	"sync"

	proto "code.google.com/p/goprotobuf/proto"
	ident "github.com/jbenet/go-ipfs/identify"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
	ma "github.com/jbenet/go-multiaddr"
)

var ErrAlreadyOpen = errors.New("Error: Connection to this peer already open.")

// Message represents a packet of information sent to or received from a
// particular Peer.
type Message struct {
	// To or from, depending on direction.
	Peer *peer.Peer

	// Opaque data
	Data []byte
}

// Cleaner looking helper function to make a new message struct
func NewMessage(p *peer.Peer, data proto.Message) *Message {
	bytes, err := proto.Marshal(data)
	if err != nil {
		u.PErr("%v\n", err.Error())
		return nil
	}
	return &Message{
		Peer: p,
		Data: bytes,
	}
}

// Chan is a swarm channel, which provides duplex communication and errors.
type Chan struct {
	Outgoing chan *Message
	Incoming chan *Message
	Errors   chan error
	Close    chan bool
}

// NewChan constructs a Chan instance, with given buffer size bufsize.
func NewChan(bufsize int) *Chan {
	return &Chan{
		Outgoing: make(chan *Message, bufsize),
		Incoming: make(chan *Message, bufsize),
		Errors:   make(chan error, bufsize),
		Close:    make(chan bool, bufsize),
	}
}

// Contains a set of errors mapping to each of the swarms addresses
// that were listened on
type SwarmListenErr struct {
	Errors []error
}

func (se *SwarmListenErr) Error() string {
	if se == nil {
		return "<nil error>"
	}
	var out string
	for i, v := range se.Errors {
		if v != nil {
			out += fmt.Sprintf("%d: %s\n", i, v)
		}
	}
	return out
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {
	Chan      *Chan
	conns     ConnMap
	connsLock sync.RWMutex

	filterChans map[PBWrapper_MessageType]*Chan
	toFilter    chan *Message
	newFilters  chan *newFilterInfo

	local     *peer.Peer
	listeners []net.Listener
	haltroute chan struct{}
}

// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(local *peer.Peer) *Swarm {
	s := &Swarm{
		Chan:        NewChan(10),
		conns:       ConnMap{},
		local:       local,
		filterChans: make(map[PBWrapper_MessageType]*Chan),
		toFilter:    make(chan *Message, 32),
		newFilters:  make(chan *newFilterInfo),
		haltroute:   make(chan struct{}),
	}
	go s.routeMessages()
	go s.fanOut()
	return s
}

// Open listeners for each network the swarm should listen on
func (s *Swarm) Listen() error {
	var ret_err *SwarmListenErr
	for i, addr := range s.local.Addresses {
		err := s.connListen(addr)
		if err != nil {
			if ret_err == nil {
				ret_err = new(SwarmListenErr)
				ret_err.Errors = make([]error, len(s.local.Addresses))
			}
			ret_err.Errors[i] = err
			u.PErr("Failed to listen on: %s [%s]", addr, err)
		}
	}
	if ret_err == nil {
		return nil
	}
	return ret_err
}

// Listen for new connections on the given multiaddr
func (s *Swarm) connListen(maddr *ma.Multiaddr) error {
	netstr, addr, err := maddr.DialArgs()
	if err != nil {
		return err
	}

	list, err := net.Listen(netstr, addr)
	if err != nil {
		return err
	}

	// NOTE: this may require a lock around it later. currently, only run on setup
	s.listeners = append(s.listeners, list)

	// Accept and handle new connections on this listener until it errors
	go func() {
		for {
			nconn, err := list.Accept()
			if err != nil {
				e := fmt.Errorf("Failed to accept connection: %s - %s [%s]",
					netstr, addr, err)
				go func() { s.Chan.Errors <- e }()
				return
			}
			go s.handleNewConn(nconn)
		}
	}()

	return nil
}

// Handle getting ID from this peer and adding it into the map
func (s *Swarm) handleNewConn(nconn net.Conn) {
	p := new(peer.Peer)

	conn := &Conn{
		Peer: p,
		Addr: nil,
		Conn: nconn,
	}
	newConnChans(conn)

	_, _, err := ident.Handshake(s.local, p, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
	if err != nil {
		u.PErr("%v\n", err.Error())
		conn.Close()
		return
	}

	// Get address to contact remote peer from
	addr := <-conn.Incoming.MsgChan
	maddr, err := ma.NewMultiaddr(string(addr))
	if err != nil {
		u.PErr("Got invalid address from peer.")
		s.Error(err)
		return
	}
	p.AddAddress(maddr)

	err = s.StartConn(conn)
	if err != nil {
		s.Error(err)
	}
}

// Close closes a swarm.
func (s *Swarm) Close() {
	s.connsLock.RLock()
	l := len(s.conns)
	s.connsLock.RUnlock()

	for i := 0; i < l; i++ {
		s.Chan.Close <- true // fan ins
	}
	s.Chan.Close <- true // fan out
	s.Chan.Close <- true // listener

	for _, list := range s.listeners {
		list.Close()
	}

	s.haltroute <- struct{}{}

	for _, filter := range s.filterChans {
		filter.Close <- true
	}
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
//
// For now, Dial uses only TCP. This will be extended.
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) {
	k := peer.Key()

	// check if we already have an open connection first
	s.connsLock.RLock()
	conn, found := s.conns[k]
	s.connsLock.RUnlock()
	if found {
		return conn, nil, true
	}

	// open connection to peer
	conn, err := Dial("tcp", peer)
	if err != nil {
		return nil, err, false
	}

	return conn, nil, false
}

// StartConn adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func (s *Swarm) StartConn(conn *Conn) error {
	if conn == nil {
		return errors.New("Tried to start nil connection.")
	}

	u.DOut("Starting connection: %s\n", conn.Peer.Key().Pretty())
	// add to conns
	s.connsLock.Lock()
	if _, ok := s.conns[conn.Peer.Key()]; ok {
		s.connsLock.Unlock()
		return ErrAlreadyOpen
	}
	s.conns[conn.Peer.Key()] = conn
	s.connsLock.Unlock()

	// kick off reader goroutine
	go s.fanIn(conn)
	return nil
}

// Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() {
	for {
		select {
		case <-s.Chan.Close:
			return // told to close.
		case msg, ok := <-s.Chan.Outgoing:
			if !ok {
				return
			}

			if len(msg.Data) > MaxMessageSize {
				s.Error(fmt.Errorf("Exceeded max message size! (tried to send len = %d)", len(msg.Data)))
			}

			s.connsLock.RLock()
			conn, found := s.conns[msg.Peer.Key()]
			s.connsLock.RUnlock()

			if !found {
				e := fmt.Errorf("Sent msg to peer without open conn: %v",
					msg.Peer)
				s.Chan.Errors <- e
				continue
			}

			// queue it in the connection's buffer
			conn.Outgoing.MsgChan <- msg.Data
		}
	}
}

// Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanIn(conn *Conn) {
	for {
		select {
		case <-s.Chan.Close:
			// close Conn.
			conn.Close()
			goto out

		case <-conn.Closed:
			goto out

		case data, ok := <-conn.Incoming.MsgChan:
			if !ok {
				e := fmt.Errorf("Error retrieving from conn: %v", conn.Peer.Key().Pretty())
				s.Chan.Errors <- e
				goto out
			}

			msg := &Message{Peer: conn.Peer, Data: data}
			s.toFilter <- msg
		}
	}
out:

	s.connsLock.Lock()
	delete(s.conns, conn.Peer.Key())
	s.connsLock.Unlock()
}

type newFilterInfo struct {
	Type PBWrapper_MessageType
	resp chan *Chan
}

func (s *Swarm) routeMessages() {
	for {
		select {
		case mes, ok := <-s.toFilter:
			if !ok {
				return
			}
			wrapper, err := Unwrap(mes.Data)
			if err != nil {
				u.PErr("error in route messages: %s\n", err)
			}

			ch, ok := s.filterChans[PBWrapper_MessageType(wrapper.GetType())]
			if !ok {
				u.PErr("Received message with invalid type: %d\n", wrapper.GetType())
				continue
			}

			mes.Data = wrapper.GetMessage()
			ch.Incoming <- mes
		case gchan := <-s.newFilters:
			nch, ok := s.filterChans[gchan.Type]
			if !ok {
				nch = NewChan(16)
				s.filterChans[gchan.Type] = nch
				go s.muxChan(nch, gchan.Type)
			}
			gchan.resp <- nch
		case <-s.haltroute:
			return
		}
	}
}

func (s *Swarm) muxChan(ch *Chan, typ PBWrapper_MessageType) {
	for {
		select {
		case <-ch.Close:
			return
		case mes := <-ch.Outgoing:
			data, err := Wrap(mes.Data, typ)
			if err != nil {
				u.PErr("muxChan error: %s\n", err)
				continue
			}
			mes.Data = data
			s.Chan.Outgoing <- mes
		}
	}
}

func (s *Swarm) Find(key u.Key) *peer.Peer {
	s.connsLock.RLock()
	defer s.connsLock.RUnlock()
	conn, found := s.conns[key]
	if !found {
		return nil
	}
	return conn.Peer
}

// GetConnection will check if we are already connected to the peer in question
// and only open a new connection if we arent already
func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) {
	p := &peer.Peer{
		ID:        id,
		Addresses: []*ma.Multiaddr{addr},
	}

	if id.Equal(s.local.ID) {
		panic("Attempted connection to self!")
	}

	conn, err, reused := s.Dial(p)
	if err != nil {
		return nil, err
	}

	if reused {
		return p, nil
	}

	err = s.handleDialedCon(conn)
	return conn.Peer, err
}

// Handle performing a handshake on a new connection and ensuring proper forward communication
func (s *Swarm) handleDialedCon(conn *Conn) error {
	_, _, err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
	if err != nil {
		return err
	}

	// Send node an address that you can be reached on
	myaddr := s.local.NetAddress("tcp")
	mastr, err := myaddr.String()
	if err != nil {
		errors.New("No local address to send to peer.")
	}

	conn.Outgoing.MsgChan <- []byte(mastr)

	s.StartConn(conn)

	return nil
}

// ConnectNew is for connecting to a peer when you dont know their ID,
// Should only be used when you are sure that you arent already connected to peer in question
func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) {
	if addr == nil {
		return nil, errors.New("nil Multiaddr passed to swarm.Connect()")
	}
	npeer := new(peer.Peer)
	npeer.AddAddress(addr)

	conn, err := Dial("tcp", npeer)
	if err != nil {
		return nil, err
	}

	err = s.handleDialedCon(conn)
	return npeer, err
}

// Removes a given peer from the swarm and closes connections to it
func (s *Swarm) Drop(p *peer.Peer) error {
	u.DOut("Dropping peer: [%s]\n", p.ID.Pretty())
	s.connsLock.RLock()
	conn, found := s.conns[u.Key(p.ID)]
	s.connsLock.RUnlock()
	if !found {
		return u.ErrNotFound
	}

	s.connsLock.Lock()
	delete(s.conns, u.Key(p.ID))
	s.connsLock.Unlock()

	return conn.Close()
}

func (s *Swarm) Error(e error) {
	s.Chan.Errors <- e
}

func (s *Swarm) GetErrChan() chan error {
	return s.Chan.Errors
}

func (s *Swarm) GetChannel(typ PBWrapper_MessageType) *Chan {
	nfi := &newFilterInfo{
		Type: typ,
		resp: make(chan *Chan),
	}
	s.newFilters <- nfi

	return <-nfi.resp
}

// Temporary to ensure that the Swarm always matches the Network interface as we are changing it
var _ Network = &Swarm{}