swarm_dial.go 8.73 KB
Newer Older
1 2 3 4 5
package swarm

import (
	"errors"
	"fmt"
6
	"net"
7
	"sync"
8
	"time"
9

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	conn "github.com/jbenet/go-ipfs/p2p/net/conn"
11
	addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
	peer "github.com/jbenet/go-ipfs/p2p/peer"
13 14 15
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
16
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
17
	manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
18 19
)

20 21 22
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3

23 24 25
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
26
var DialTimeout time.Duration = time.Second * 10
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
//  for {
//  	c := findConnectionToPeer(peer)
//  	if c != nil {
//  		return c
//  	}
//
//  	// ok, no connections. should we dial?
//  	if ok, wait := dialsync.Lock(peer); !ok {
//  		<-wait // can optionally wait
//  		continue
//  	}
//  	defer dialsync.Unlock(peer)
//
//  	c := actuallyDial(peer)
//  	return c
//  }
//
type dialsync struct {
	// ongoing is a map of tickets for the current peers being dialed.
	// this way, we dont kick off N dials simultaneously.
	ongoing map[peer.ID]chan struct{}
	lock    sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
	ds.lock.Lock()
	if ds.ongoing == nil { // init if not ready
		ds.ongoing = make(map[peer.ID]chan struct{})
	}
	wait, found := ds.ongoing[dst]
	if !found {
		ds.ongoing[dst] = make(chan struct{})
	}
	ds.lock.Unlock()

	if found {
		return false, wait
	}

	// ok! you're signed up to dial!
	return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
	ds.lock.Lock()
	wait, found := ds.ongoing[dst]
	if !found {
		panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
	}
	delete(ds.ongoing, dst) // remove ongoing dial
	close(wait)             // release everyone else
	ds.lock.Unlock()
}

99 100 101 102 103 104
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
105 106
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
107 108 109
		return nil, errors.New("Attempted connection to self!")
	}

110 111 112 113 114
	// this loop is here because dials take time, and we should not be dialing
	// the same peer concurrently (silly waste). Additonally, it's structured
	// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
	// may have received an incoming connection! if so, we no longer must dial.
	//
115
	// During the dial attempts, we may be doing the dialing. if not, we wait.
116 117
	var err error
	var conn *Conn
118
	for i := 0; i < dialAttempts; i++ {
119 120
		// check if we already have an open connection first
		cs := s.ConnectionsToPeer(p)
121 122 123
		for _, conn = range cs {
			if conn != nil { // dump out the first one we find. (TODO pick better)
				return conn, nil
124
			}
125
		}
126 127

		// check if there's an ongoing dial to this peer
128
		if ok, wait := s.dsync.Lock(p); !ok {
129
			log.Debugf("swarm %s dialing %s -- waiting for ongoing dial", s.local, p)
130
			select {
131 132 133
			case <-wait: // wait for that dial to finish.
				continue // and see if it worked (loop), OR we got an incoming dial.
			case <-ctx.Done(): // or we may have to bail...
134 135 136 137
				return nil, ctx.Err()
			}
		}

138
		// ok, we have been charged to dial! let's do it.
139
		// if it succeeds, dial will add the conn to the swarm itself.
140 141 142
		log.Debugf("swarm %s dialing %s -- dial start", s.local, p)
		ctxT, _ := context.WithTimeout(ctx, DialTimeout)
		conn, err = s.dial(ctxT, p)
143
		s.dsync.Unlock(p)
144
		log.Debugf("swarm %s dialing %s -- dial end %s", s.local, p, conn)
145 146 147 148 149
		if err != nil {
			continue // ok, we failed. try again. (if loop is done, our error is output)
		}
		return conn, nil
	}
150 151 152
	if err == nil {
		err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
	}
153 154 155 156 157 158 159
	return nil, err
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
160 161
	}

162 163 164 165
	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
166 167
	}

168 169 170 171 172 173 174
	// get our own addrs
	localAddrs := s.peers.Addresses(s.local)
	if len(localAddrs) == 0 {
		log.Debug("Dialing out with no local addresses.")
	}

	// get remote peer addrs
175
	remoteAddrs := s.peers.Addresses(p)
176
	// make sure we can use the addresses.
177
	remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
178 179
	// drop out any addrs that would just dial ourselves. use ListenAddresses
	// as that is a more authoritative view than localAddrs.
180 181 182 183
	ila, _ := InterfaceListenAddresses(s)
	remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
	remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
	log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
184 185 186
	if len(remoteAddrs) == 0 {
		return nil, errors.New("peer has no addresses")
	}
187

188 189
	// open connection to peer
	d := &conn.Dialer{
190 191 192 193 194
		Dialer: manet.Dialer{
			Dialer: net.Dialer{
				Timeout: DialTimeout,
			},
		},
195 196 197
		LocalPeer:  s.local,
		LocalAddrs: localAddrs,
		PrivateKey: sk,
198 199
	}

200 201
	// try to get a connection to any addr
	connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
202 203 204 205 206 207 208 209 210
	if err != nil {
		return nil, err
	}

	// ok try to setup the new connection.
	swarmC, err := dialConnSetup(ctx, s, connC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
211
		connC.Close() // close the connection. didn't work out :(
212 213 214 215 216 217 218
		return nil, err
	}

	log.Event(ctx, "dial", p)
	return swarmC, nil
}

219 220 221 222 223
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {

	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
224 225
	log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
	var err error
226
	for _, addr := range remoteAddrs {
227 228 229
		log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
		var connC conn.Conn
		connC, err = d.Dial(ctx, addr, p)
230
		if err != nil {
231
			log.Info("%s --> %s dial attempt failed: %s", s.local, p, err)
232 233 234 235 236
			continue
		}

		// if the connection is not to whom we thought it would be...
		if connC.RemotePeer() != p {
237
			log.Infof("misdial to %s through %s (got %s)", p, addr, connC.RemotePeer())
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
			connC.Close()
			continue
		}

		// if the connection is to ourselves...
		// this can happen TONS when Loopback addrs are advertized.
		// (this should be caught by two checks above, but let's just make sure.)
		if connC.RemotePeer() == s.local {
			log.Infof("misdial to %s through %s", p, addr)
			connC.Close()
			continue
		}

		// success! we got one!
		return connC, nil
	}
254 255 256
	if err != nil {
		return nil, err
	}
257 258 259
	return nil, fmt.Errorf("failed to dial %s", p)
}

260 261
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
263 264 265 266 267 268 269 270 271 272 273 274

	psC, err := s.swarm.AddConn(connC)
	if err != nil {
		// connC is closed by caller if we fail.
		return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
	}

	// ok try to setup the new connection. (newConnSetup will add to group)
	swarmC, err := s.newConnSetup(ctx, psC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
275
		psC.Close() // we need to make sure psC is Closed.
276 277 278 279 280
		return nil, err
	}

	return swarmC, err
}