swarm_dial.go 8.55 KB
Newer Older
1 2 3 4 5
package swarm

import (
	"errors"
	"fmt"
6
	"sync"
7
	"time"
8

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	conn "github.com/jbenet/go-ipfs/p2p/net/conn"
10
	addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	peer "github.com/jbenet/go-ipfs/p2p/peer"
12 13 14
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
15
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
16 17
)

18 19 20
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3

21 22 23 24 25
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
var DialTimeout time.Duration = time.Second * 30

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
//  for {
//  	c := findConnectionToPeer(peer)
//  	if c != nil {
//  		return c
//  	}
//
//  	// ok, no connections. should we dial?
//  	if ok, wait := dialsync.Lock(peer); !ok {
//  		<-wait // can optionally wait
//  		continue
//  	}
//  	defer dialsync.Unlock(peer)
//
//  	c := actuallyDial(peer)
//  	return c
//  }
//
type dialsync struct {
	// ongoing is a map of tickets for the current peers being dialed.
	// this way, we dont kick off N dials simultaneously.
	ongoing map[peer.ID]chan struct{}
	lock    sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
	ds.lock.Lock()
	if ds.ongoing == nil { // init if not ready
		ds.ongoing = make(map[peer.ID]chan struct{})
	}
	wait, found := ds.ongoing[dst]
	if !found {
		ds.ongoing[dst] = make(chan struct{})
	}
	ds.lock.Unlock()

	if found {
		return false, wait
	}

	// ok! you're signed up to dial!
	return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
	ds.lock.Lock()
	wait, found := ds.ongoing[dst]
	if !found {
		panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
	}
	delete(ds.ongoing, dst) // remove ongoing dial
	close(wait)             // release everyone else
	ds.lock.Unlock()
}

97 98 99 100 101 102
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
103 104
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
105 106 107
		return nil, errors.New("Attempted connection to self!")
	}

108 109 110 111 112
	// this loop is here because dials take time, and we should not be dialing
	// the same peer concurrently (silly waste). Additonally, it's structured
	// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
	// may have received an incoming connection! if so, we no longer must dial.
	//
113
	// During the dial attempts, we may be doing the dialing. if not, we wait.
114 115
	var err error
	var conn *Conn
116
	for i := 0; i < dialAttempts; i++ {
117 118
		// check if we already have an open connection first
		cs := s.ConnectionsToPeer(p)
119 120 121
		for _, conn = range cs {
			if conn != nil { // dump out the first one we find. (TODO pick better)
				return conn, nil
122
			}
123
		}
124 125

		// check if there's an ongoing dial to this peer
126
		if ok, wait := s.dsync.Lock(p); !ok {
127
			log.Debugf("swarm %s dialing %s -- waiting for ongoing dial", s.local, p)
128
			select {
129 130 131
			case <-wait: // wait for that dial to finish.
				continue // and see if it worked (loop), OR we got an incoming dial.
			case <-ctx.Done(): // or we may have to bail...
132 133 134 135
				return nil, ctx.Err()
			}
		}

136
		// ok, we have been charged to dial! let's do it.
137
		// if it succeeds, dial will add the conn to the swarm itself.
138 139 140
		log.Debugf("swarm %s dialing %s -- dial start", s.local, p)
		ctxT, _ := context.WithTimeout(ctx, DialTimeout)
		conn, err = s.dial(ctxT, p)
141
		s.dsync.Unlock(p)
142
		log.Debugf("swarm %s dialing %s -- dial end %s", s.local, p, conn)
143 144 145 146 147
		if err != nil {
			continue // ok, we failed. try again. (if loop is done, our error is output)
		}
		return conn, nil
	}
148 149 150
	if err == nil {
		err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
	}
151 152 153 154 155 156 157
	return nil, err
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
158 159
	}

160 161 162 163
	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
164 165
	}

166 167 168 169 170 171 172
	// get our own addrs
	localAddrs := s.peers.Addresses(s.local)
	if len(localAddrs) == 0 {
		log.Debug("Dialing out with no local addresses.")
	}

	// get remote peer addrs
173
	remoteAddrs := s.peers.Addresses(p)
174
	// make sure we can use the addresses.
175
	remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
176 177
	// drop out any addrs that would just dial ourselves. use ListenAddresses
	// as that is a more authoritative view than localAddrs.
178 179 180 181
	ila, _ := InterfaceListenAddresses(s)
	remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
	remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
	log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
182 183 184
	if len(remoteAddrs) == 0 {
		return nil, errors.New("peer has no addresses")
	}
185

186 187 188 189 190
	// open connection to peer
	d := &conn.Dialer{
		LocalPeer:  s.local,
		LocalAddrs: localAddrs,
		PrivateKey: sk,
191 192
	}

193 194
	// try to get a connection to any addr
	connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
195 196 197 198 199 200 201 202 203
	if err != nil {
		return nil, err
	}

	// ok try to setup the new connection.
	swarmC, err := dialConnSetup(ctx, s, connC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
204
		connC.Close() // close the connection. didn't work out :(
205 206 207 208 209 210 211
		return nil, err
	}

	log.Event(ctx, "dial", p)
	return swarmC, nil
}

212 213 214 215 216
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {

	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
217 218
	log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
	var err error
219
	for _, addr := range remoteAddrs {
220 221 222
		log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
		var connC conn.Conn
		connC, err = d.Dial(ctx, addr, p)
223
		if err != nil {
224
			log.Info("%s --> %s dial attempt failed: %s", s.local, p, err)
225 226 227 228 229
			continue
		}

		// if the connection is not to whom we thought it would be...
		if connC.RemotePeer() != p {
230
			log.Infof("misdial to %s through %s (got %s)", p, addr, connC.RemotePeer())
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
			connC.Close()
			continue
		}

		// if the connection is to ourselves...
		// this can happen TONS when Loopback addrs are advertized.
		// (this should be caught by two checks above, but let's just make sure.)
		if connC.RemotePeer() == s.local {
			log.Infof("misdial to %s through %s", p, addr)
			connC.Close()
			continue
		}

		// success! we got one!
		return connC, nil
	}
247 248 249
	if err != nil {
		return nil, err
	}
250 251 252
	return nil, fmt.Errorf("failed to dial %s", p)
}

253 254
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
255
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
256 257 258 259 260 261 262 263 264 265 266 267

	psC, err := s.swarm.AddConn(connC)
	if err != nil {
		// connC is closed by caller if we fail.
		return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
	}

	// ok try to setup the new connection. (newConnSetup will add to group)
	swarmC, err := s.newConnSetup(ctx, psC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
268
		psC.Close() // we need to make sure psC is Closed.
269 270 271 272 273
		return nil, err
	}

	return swarmC, err
}