Commit a4e49234 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

added multiconn

parent 68b85c99
package conn
import (
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// Duplex is a simple duplex channel
type Duplex struct {
In chan []byte
Out chan []byte
}
// MultiConn represents a single connection to another Peer (IPFS Node).
type MultiConn struct {
// connections, mapped by a string, which uniquely identifies the connection.
// this string is: /addr1/peer1/addr2/peer2 (peers ordered lexicographically)
conns map[string]Conn
local *peer.Peer
remote *peer.Peer
// fan-in/fan-out
duplex Duplex
// for adding/removing connections concurrently
sync.RWMutex
ContextCloser
}
// NewMultiConn constructs a new connection
func NewMultiConn(ctx context.Context, local, remote *peer.Peer, conns []Conn) (Conn, error) {
c := &MultiConn{
local: local,
remote: remote,
conns: map[string]Conn{},
duplex: Duplex{
In: make(chan []byte, 10),
Out: make(chan []byte, 10),
},
}
// must happen before Adds / fanOut
c.ContextCloser = NewContextCloser(ctx, c.close)
log.Info("adding %d...", len(conns))
if conns != nil && len(conns) > 0 {
c.Add(conns...)
}
go c.fanOut()
log.Info("newMultiConn: %v to %v", local, remote)
return c, nil
}
// Add adds given Conn instances to multiconn.
func (c *MultiConn) Add(conns ...Conn) {
c.Lock()
defer c.Unlock()
for _, c2 := range conns {
log.Info("MultiConn: adding %s", c2)
if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() {
log.Error("%s", c2)
panic("connection addresses mismatch")
}
c.conns[c2.ID()] = c2
go c.fanInSingle(c2)
log.Info("MultiConn: added %s", c2)
}
}
// Remove removes given Conn instances from multiconn.
func (c *MultiConn) Remove(conns ...Conn) {
// first remove them to avoid sending any more messages through it.
{
c.Lock()
for _, c1 := range conns {
c2, found := c.conns[c1.ID()]
if !found {
panic("Conn not in MultiConn")
}
if c1 != c2 {
panic("different Conn objects for same id.")
}
delete(c.conns, c2.ID())
}
c.Unlock()
}
// close all in parallel, but wait for all to be done closing.
CloseConns(conns)
}
// CloseConns closes multiple connections in parallel, and waits for all
// to finish closing.
func CloseConns(conns []Conn) {
var wg sync.WaitGroup
for _, child := range conns {
select {
case <-child.Closed(): // if already closed, continue
continue
default:
}
wg.Add(1)
go func(child Conn) {
child.Close()
wg.Done()
}(child)
}
wg.Wait()
}
// fanOut is the multiplexor out -- it sends outgoing messages over the
// underlying single connections.
func (c *MultiConn) fanOut() {
c.Children().Add(1)
defer c.Children().Done()
for {
select {
case <-c.Closing():
return
// send data out through our "best connection"
case m, more := <-c.duplex.Out:
if !more {
return
}
sc := c.BestConn()
if sc == nil {
// maybe this should be a logged error, not a panic.
panic("sending out multiconn without any live connection")
}
sc.Out() <- m
}
}
}
// fanInSingle is a multiplexor in -- it receives incoming messages over the
// underlying single connections.
func (c *MultiConn) fanInSingle(child Conn) {
c.Children().Add(1)
child.Children().Add(1) // yep, on the child too.
// cleanup all data associated with this child Connection.
defer func() {
// in case it still is in the map, remove it.
c.Lock()
delete(c.conns, child.ID())
c.Unlock()
c.Children().Done()
child.Children().Done()
}()
for {
select {
case <-c.Closing(): // multiconn closing
return
case <-child.Closing(): // child closing
return
case m, more := <-child.In(): // receiving data
if !more {
return // closed
}
c.duplex.In <- m
}
}
}
// close is the internal close function, called by ContextCloser.Close
func (c *MultiConn) close() error {
log.Debug("%s closing Conn with %s", c.local, c.remote)
// get connections
c.RLock()
conns := make([]Conn, 0, len(c.conns))
for _, c := range c.conns {
conns = append(conns, c)
}
c.RUnlock()
// close underlying connections
CloseConns(conns)
return nil
}
// BestConn is the best connection in this MultiConn
func (c *MultiConn) BestConn() Conn {
c.RLock()
defer c.RUnlock()
var id1 string
var c1 Conn
for id2, c2 := range c.conns {
if id1 == "" || id1 < id2 {
id1 = id2
c1 = c2
}
}
return c1
}
// ID is an identifier unique to this connection.
// In MultiConn, this is all the children IDs XORed together.
func (c *MultiConn) ID() string {
c.RLock()
defer c.RUnlock()
ids := []byte(nil)
for i := range c.conns {
if ids == nil {
ids = []byte(i)
} else {
ids = u.XOR(ids, []byte(i))
}
}
return string(ids)
}
func (c *MultiConn) String() string {
return String(c, "MultiConn")
}
// LocalMultiaddr is the Multiaddr on this side
func (c *MultiConn) LocalMultiaddr() ma.Multiaddr {
return c.BestConn().LocalMultiaddr()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *MultiConn) RemoteMultiaddr() ma.Multiaddr {
return c.BestConn().RemoteMultiaddr()
}
// LocalPeer is the Peer on this side
func (c *MultiConn) LocalPeer() *peer.Peer {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *MultiConn) RemotePeer() *peer.Peer {
return c.remote
}
// In returns a readable message channel
func (c *MultiConn) In() <-chan []byte {
return c.duplex.In
}
// Out returns a writable message channel
func (c *MultiConn) Out() chan<- []byte {
return c.duplex.Out
}
package conn
import (
"fmt"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func tcpAddr(t *testing.T, port int) ma.Multiaddr {
tcp, err := ma.NewMultiaddr(tcpAddrString(port))
if err != nil {
t.Fatal(err)
}
return tcp
}
func tcpAddrString(port int) string {
return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)
}
type msg struct {
sent bool
received bool
payload string
}
func (m *msg) Sent(t *testing.T) {
if m.sent {
t.Fatal("sent msg at incorrect state:", m)
}
m.sent = true
}
func (m *msg) Received(t *testing.T) {
if m.received {
t.Fatal("received msg at incorrect state:", m)
}
m.received = true
}
type msgMap struct {
sent int
recv int
msgs map[string]*msg
}
func (mm *msgMap) Sent(t *testing.T, payload string) {
mm.msgs[payload].Sent(t)
mm.sent++
}
func (mm *msgMap) Received(t *testing.T, payload string) {
mm.msgs[payload].Received(t)
mm.recv++
}
func (mm *msgMap) CheckDone(t *testing.T) {
if mm.sent != len(mm.msgs) {
t.Fatal("failed to send all msgs", mm.sent, len(mm.msgs))
}
if mm.sent != len(mm.msgs) {
t.Fatal("failed to send all msgs", mm.sent, len(mm.msgs))
}
}
func genMessages(num int, tag string) *msgMap {
msgs := &msgMap{msgs: map[string]*msg{}}
for i := 0; i < num; i++ {
s := fmt.Sprintf("Message #%d -- %s", i, tag)
msgs.msgs[s] = &msg{payload: s}
}
return msgs
}
func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) {
log.Info("Setting up peers")
p1, err := setupPeer(tcpAddrString(11000))
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer(tcpAddrString(12000))
if err != nil {
t.Fatal("error setting up peer", err)
}
// peerstores
p1ps := peer.NewPeerstore()
p2ps := peer.NewPeerstore()
// listeners
listen := func(addr ma.Multiaddr, p *peer.Peer, ps peer.Peerstore) Listener {
l, err := Listen(ctx, addr, p, ps)
if err != nil {
t.Fatal(err)
}
return l
}
log.Info("Setting up listeners")
p1l := listen(p1.Addresses[0], p1, p1ps)
p2l := listen(p2.Addresses[0], p2, p2ps)
// dialers
p1d := &Dialer{Peerstore: p1ps, LocalPeer: p1}
p2d := &Dialer{Peerstore: p2ps, LocalPeer: p2}
dial := func(d *Dialer, dst *peer.Peer) <-chan Conn {
cc := make(chan Conn)
go func() {
c, err := d.Dial(ctx, "tcp", dst)
if err != nil {
t.Fatal("error dialing peer", err)
}
cc <- c
}()
return cc
}
// connect simultaneously
log.Info("Connecting...")
p1dc := dial(p1d, p2)
p2dc := dial(p2d, p1)
c12a := <-p1l.Accept()
c12b := <-p1dc
c21a := <-p2l.Accept()
c21b := <-p2dc
log.Info("Ok, making multiconns")
c1, err := NewMultiConn(ctx, p1, p2, []Conn{c12a, c12b})
if err != nil {
t.Fatal(err)
}
c2, err := NewMultiConn(ctx, p2, p1, []Conn{c21a, c21b})
if err != nil {
t.Fatal(err)
}
log.Info("did you make multiconns?")
return c1.(*MultiConn), c2.(*MultiConn)
}
func TestMulticonnSend(t *testing.T) {
log.Info("TestMulticonnSend")
ctx := context.Background()
ctxC, cancel := context.WithCancel(ctx)
c1, c2 := setupMultiConns(t, ctx)
log.Info("gen msgs")
num := 100
msgsFrom1 := genMessages(num, "from p1 to p2")
msgsFrom2 := genMessages(num, "from p2 to p1")
var wg sync.WaitGroup
send := func(c *MultiConn, msgs *msgMap) {
defer wg.Done()
for _, m := range msgs.msgs {
log.Info("send: %s", m.payload)
c.Out() <- []byte(m.payload)
msgs.Sent(t, m.payload)
<-time.After(time.Microsecond * 10)
}
}
recv := func(ctx context.Context, c *MultiConn, msgs *msgMap) {
defer wg.Done()
for {
select {
case payload := <-c.In():
msgs.Received(t, string(payload))
log.Info("recv: %s", payload)
if msgs.recv == len(msgs.msgs) {
return
}
case <-ctx.Done():
return
}
}
}
log.Info("msg send + recv")
wg.Add(4)
go send(c1, msgsFrom1)
go send(c2, msgsFrom2)
go recv(ctxC, c1, msgsFrom2)
go recv(ctxC, c2, msgsFrom1)
wg.Wait()
cancel()
c1.Close()
c2.Close()
msgsFrom1.CheckDone(t)
msgsFrom2.CheckDone(t)
<-time.After(100 * time.Millisecond)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment