core.go 25.8 KB
Newer Older
1 2 3 4
/*
Package core implements the IpfsNode object and related methods.

Packages underneath core/ provide a (relatively) stable, low-level API
5 6 7 8
to carry out most IPFS-related tasks.  For more details on the other
interfaces and how core/... fits into the bigger IPFS picture, see:

  $ godoc github.com/ipfs/go-ipfs
9
*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
package core

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
import (
Jakub Sztandera's avatar
Jakub Sztandera committed
13
	"bytes"
14
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	"errors"
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
16
	"fmt"
17
	"io"
18 19 20
	"io/ioutil"
	"os"
	"strings"
21
	"time"
22

23 24 25 26
	bserv "github.com/ipfs/go-ipfs/blockservice"
	bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	rp "github.com/ipfs/go-ipfs/exchange/reprovide"
27
	filestore "github.com/ipfs/go-ipfs/filestore"
28 29 30 31 32
	mount "github.com/ipfs/go-ipfs/fuse/mount"
	merkledag "github.com/ipfs/go-ipfs/merkledag"
	mfs "github.com/ipfs/go-ipfs/mfs"
	namesys "github.com/ipfs/go-ipfs/namesys"
	ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
Łukasz Magiera's avatar
Łukasz Magiera committed
33
	p2p "github.com/ipfs/go-ipfs/p2p"
34
	"github.com/ipfs/go-ipfs/path/resolver"
35 36 37 38
	pin "github.com/ipfs/go-ipfs/pin"
	repo "github.com/ipfs/go-ipfs/repo"
	config "github.com/ipfs/go-ipfs/repo/config"
	ft "github.com/ipfs/go-ipfs/unixfs"
Jeromy's avatar
Jeromy committed
39

Steven Allen's avatar
Steven Allen committed
40
	circuit "gx/ipfs/QmNXLcLAcfo8yp59FxFQJNa7pDbUUw97QN9GwefWWFK4hk/go-libp2p-circuit"
Steven Allen's avatar
Steven Allen committed
41
	u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
Steven Allen's avatar
Steven Allen committed
42 43 44 45 46 47 48 49 50 51
	metrics "gx/ipfs/QmNnaGds3p4hfTqSH4KURKh8pBRcisAWYbNDEGeMZ7c3Hv/go-libp2p-metrics"
	nilrouting "gx/ipfs/QmPFAxh9UwfqwseVcWkj1Lz1gCHyQ6QuCk5m5XUp6vifkL/go-ipfs-routing/none"
	offroute "gx/ipfs/QmPFAxh9UwfqwseVcWkj1Lz1gCHyQ6QuCk5m5XUp6vifkL/go-ipfs-routing/offline"
	pnet "gx/ipfs/QmRHCy6EKtr6uKBhEt6AmvScH87mHSgKcHocbv9djjaqwQ/go-libp2p-pnet"
	libp2p "gx/ipfs/QmRvoAami8AAf5Yy6jcPq5KqQT1ZCaoi9dF1vdKAghmq9X/go-libp2p"
	discovery "gx/ipfs/QmRvoAami8AAf5Yy6jcPq5KqQT1ZCaoi9dF1vdKAghmq9X/go-libp2p/p2p/discovery"
	p2pbhost "gx/ipfs/QmRvoAami8AAf5Yy6jcPq5KqQT1ZCaoi9dF1vdKAghmq9X/go-libp2p/p2p/host/basic"
	rhost "gx/ipfs/QmRvoAami8AAf5Yy6jcPq5KqQT1ZCaoi9dF1vdKAghmq9X/go-libp2p/p2p/host/routed"
	identify "gx/ipfs/QmRvoAami8AAf5Yy6jcPq5KqQT1ZCaoi9dF1vdKAghmq9X/go-libp2p/p2p/protocol/identify"
	ping "gx/ipfs/QmRvoAami8AAf5Yy6jcPq5KqQT1ZCaoi9dF1vdKAghmq9X/go-libp2p/p2p/protocol/ping"
52
	goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
Jeromy's avatar
Jeromy committed
53
	mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
Steven Allen's avatar
Steven Allen committed
54
	logging "gx/ipfs/QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7/go-log"
55
	record "gx/ipfs/QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT/go-libp2p-record"
Steven Allen's avatar
Steven Allen committed
56 57
	psrouter "gx/ipfs/QmUGRKzMT37Kinmse8K7Cr7cBUgifGz13qp1Pw33DPHdZc/go-libp2p-pubsub-router"
	ifconnmgr "gx/ipfs/QmWCWsDQnnQ9Mo9V3GK8TSR91662FdFxjjqPX8YbHC8Ltz/go-libp2p-interface-connmgr"
Steven Allen's avatar
Steven Allen committed
58
	ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
Steven Allen's avatar
Steven Allen committed
59
	routing "gx/ipfs/QmXijJ3T9MjB2v8xpFDoEX6FqR9u8PkJkzu49TgwJ8Ndr5/go-libp2p-routing"
Steven Allen's avatar
Steven Allen committed
60
	smux "gx/ipfs/QmY9JXR3FupnYAYJWK9aMr9bCpqWKcToQ1tz8DVGTrHpHw/go-stream-muxer"
Steven Allen's avatar
Steven Allen committed
61 62 63 64
	mafilter "gx/ipfs/QmYF6XMumtqvXtZfQKrCokXEdFvfy2i7tbpvSRzRrHyY8c/go-maddr-filter"
	dht "gx/ipfs/QmYyonQoGb5Gw5VnGqgjKPPm1x3rY9QSquWCZqGKdiwuTw/go-libp2p-kad-dht"
	dhtopts "gx/ipfs/QmYyonQoGb5Gw5VnGqgjKPPm1x3rY9QSquWCZqGKdiwuTw/go-libp2p-kad-dht/opts"
	pstore "gx/ipfs/QmZb7hAgQEhW9dBbzBudU39gCeD4zbe6xafD52LUuF4cUN/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
65
	mplex "gx/ipfs/QmZeGmoJ3bEwEe6Huz6GKcHENWZCx7DReuAS5va4zP24PB/go-smux-multiplex"
Steven Allen's avatar
Steven Allen committed
66 67
	rhelpers "gx/ipfs/QmZyur8nJBKxXAmu2zxuN3hXgvKVekCVCmNAKTAWSLDcLN/go-libp2p-routing-helpers"
	floodsub "gx/ipfs/QmaWsab8a1KQgoxWP3RjK7mBhSi5PB9pR6NwZUrSXvVd1i/go-libp2p-floodsub"
Steven Allen's avatar
Steven Allen committed
68
	bstore "gx/ipfs/QmayRSLCiM2gWR7Kay8vqu3Yy5mf7yPqocF9ZRgDUPYMcc/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
69
	connmgr "gx/ipfs/QmcFKz3A8XMGx7sirMKUya49dHqCCJ978aAzpnipnhu5kd/go-libp2p-connmgr"
Steven Allen's avatar
Steven Allen committed
70
	peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
71
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
Steven Allen's avatar
Steven Allen committed
72
	yamux "gx/ipfs/QmcsgrV3nCAKjiHKZhKVXWc4oY3WBECJCqahXEMpHeMrev/go-smux-yamux"
Steven Allen's avatar
Steven Allen committed
73
	p2phost "gx/ipfs/QmdHyfNVTZ5VtUx4Xz23z8wtnioSrFQ28XSfpVkdhQBkGA/go-libp2p-host"
74
	exchange "gx/ipfs/QmdcAXgEHUueP4A7b5hjabKn2EooeHgMreMvFC249dGCgc/go-ipfs-exchange-interface"
Steven Allen's avatar
Steven Allen committed
75
	ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
76
	ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
Steven Allen's avatar
Steven Allen committed
77
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79
)

Jeromy's avatar
Jeromy committed
80
const IpnsValidatorTag = "ipns"
81

82
const kReprovideFrequency = time.Hour * 12
83
const discoveryConnTimeout = time.Second * 30
Jeromy's avatar
Jeromy committed
84

Jeromy's avatar
Jeromy committed
85
var log = logging.Logger("core")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87 88 89 90
type mode int

const (
	// zero value is not a valid mode, must be explicitly set
91
	localMode mode = iota
92 93 94 95
	offlineMode
	onlineMode
)

96 97 98 99
func init() {
	identify.ClientVersion = "go-ipfs/" + config.CurrentVersionNumber + "/" + config.CurrentCommit
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
100
// IpfsNode is IPFS Core module. It represents an IPFS instance.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102
type IpfsNode struct {

103
	// Self
104
	Identity peer.ID // the local node's identity
105

106
	Repo repo.Repo
107 108

	// Local node
109 110 111 112
	Pinning         pin.Pinner // the pinning manager
	Mounts          Mounts     // current mount state, if any.
	PrivateKey      ic.PrivKey // the local node's private Key
	PNetFingerprint []byte     // fingerprint of private network
113 114

	// Services
Jeromy's avatar
Jeromy committed
115
	Peerstore  pstore.Peerstore     // storage for other Peer instances
116
	Blockstore bstore.GCBlockstore  // the block store (lower level)
117 118 119
	Filestore  *filestore.Filestore // the filestore blockstore
	BaseBlocks bstore.Blockstore    // the raw blockstore, no filestore wrapping
	GCLocker   bstore.GCLocker      // the locker used to protect the blockstore during gc
120
	Blocks     bserv.BlockService   // the block service, get/add blocks.
121
	DAG        ipld.DAGService      // the merkle dag service, get/add objects.
122
	Resolver   *resolver.Resolver   // the path resolution system
Jeromy's avatar
Jeromy committed
123
	Reporter   metrics.Reporter
124
	Discovery  discovery.Service
Jeromy's avatar
Jeromy committed
125
	FilesRoot  *mfs.Root
126 127

	// Online
128 129 130 131 132
	PeerHost     p2phost.Host        // the network host (server+client)
	Bootstrapper io.Closer           // the periodic bootstrapper
	Routing      routing.IpfsRouting // the routing system. recommend ipfs-dht
	Exchange     exchange.Interface  // the block exchange + strategy (bitswap)
	Namesys      namesys.NameSystem  // the name system, resolves paths to hashes
Jeromy's avatar
Jeromy committed
133 134
	Ping         *ping.PingService
	Reprovider   *rp.Reprovider // the value reprovider system
Jeromy's avatar
Jeromy committed
135
	IpnsRepub    *ipnsrp.Republisher
136

Jeromy's avatar
Jeromy committed
137
	Floodsub *floodsub.PubSub
138
	PSRouter *psrouter.PubsubValueStore
Łukasz Magiera's avatar
Łukasz Magiera committed
139
	P2P      *p2p.P2P
Jeromy's avatar
Jeromy committed
140

141 142
	proc goprocess.Process
	ctx  context.Context
143

Jeromy's avatar
Jeromy committed
144
	mode         mode
145
	localModeSet bool
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146 147
}

148 149 150 151 152 153 154 155
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
	Ipfs mount.Mount
	Ipns mount.Mount
}

vyzo's avatar
vyzo committed
156
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {
157 158

	if n.PeerHost != nil { // already online.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
		return errors.New("node already online")
160 161 162
	}

	// load private key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
	if err := n.LoadPrivateKey(); err != nil {
164 165 166
		return err
	}

Jeromy's avatar
Jeromy committed
167
	// get undialable addrs from config
168 169 170 171
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
172 173

	var libp2pOpts []libp2p.Option
174
	for _, s := range cfg.Swarm.AddrFilters {
Jeromy's avatar
Jeromy committed
175 176
		f, err := mamask.NewMask(s)
		if err != nil {
177
			return fmt.Errorf("incorrectly formatted address filter in config: %s", s)
Jeromy's avatar
Jeromy committed
178
		}
Steven Allen's avatar
Steven Allen committed
179
		libp2pOpts = append(libp2pOpts, libp2p.FilterAddresses(f))
Jeromy's avatar
Jeromy committed
180 181
	}

182 183 184
	if !cfg.Swarm.DisableBandwidthMetrics {
		// Set reporter
		n.Reporter = metrics.NewBandwidthCounter()
Steven Allen's avatar
Steven Allen committed
185
		libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(n.Reporter))
186 187
	}

Jakub Sztandera's avatar
Jakub Sztandera committed
188 189 190 191 192 193
	swarmkey, err := n.Repo.SwarmKey()
	if err != nil {
		return err
	}

	if swarmkey != nil {
Steven Allen's avatar
Steven Allen committed
194
		protec, err := pnet.NewProtector(bytes.NewReader(swarmkey))
Jakub Sztandera's avatar
Jakub Sztandera committed
195
		if err != nil {
196
			return fmt.Errorf("failed to configure private network: %s", err)
Jakub Sztandera's avatar
Jakub Sztandera committed
197
		}
198
		n.PNetFingerprint = protec.Fingerprint()
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
		go func() {
			t := time.NewTicker(30 * time.Second)
			<-t.C // swallow one tick
			for {
				select {
				case <-t.C:
					if ph := n.PeerHost; ph != nil {
						if len(ph.Network().Peers()) == 0 {
							log.Warning("We are in private network and have no peers.")
							log.Warning("This might be configuration mistake.")
						}
					}
				case <-n.Process().Closing():
					t.Stop()
					return
				}
			}
		}()
Steven Allen's avatar
Steven Allen committed
217 218

		libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protec))
Jakub Sztandera's avatar
Jakub Sztandera committed
219 220
	}

221 222 223 224
	addrsFactory, err := makeAddrsFactory(cfg.Addresses)
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
225 226 227 228
	if !cfg.Swarm.DisableRelay {
		addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs)
	}
	libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrsFactory))
229

Steven Allen's avatar
Steven Allen committed
230
	connm, err := constructConnMgr(cfg.Swarm.ConnMgr)
Jeromy's avatar
Jeromy committed
231 232 233
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
234 235 236
	libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connm))

	libp2pOpts = append(libp2pOpts, makeSmuxTransportOption(mplex))
Jeromy's avatar
Jeromy committed
237

Steven Allen's avatar
Steven Allen committed
238 239
	if !cfg.Swarm.DisableNatPortMap {
		libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
240
	}
Steven Allen's avatar
Steven Allen committed
241 242 243 244 245 246 247 248 249
	if !cfg.Swarm.DisableRelay {
		var opts []circuit.RelayOpt
		if cfg.Swarm.EnableRelayHop {
			opts = append(opts, circuit.OptHop)
		}
		libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(opts...))
	}

	peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, libp2pOpts...)
vyzo's avatar
vyzo committed
250

251
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
		return err
253 254
	}

255
	if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption, pubsub, ipnsps); err != nil {
256
		return err
257 258 259
	}

	// Ok, now we're ready to listen.
Łukasz Magiera's avatar
Łukasz Magiera committed
260
	if err := startListening(n.PeerHost, cfg); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261
		return err
262
	}
263

Łukasz Magiera's avatar
Łukasz Magiera committed
264
	n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)
265

266
	// setup local discovery
Jeromy's avatar
Jeromy committed
267
	if do != nil {
Jeromy's avatar
Jeromy committed
268
		service, err := do(ctx, n.PeerHost)
Jeromy's avatar
Jeromy committed
269
		if err != nil {
Jeromy's avatar
Jeromy committed
270 271 272 273
			log.Error("mdns error: ", err)
		} else {
			service.RegisterNotifee(n)
			n.Discovery = service
Jeromy's avatar
Jeromy committed
274
		}
275 276
	}

277
	return n.Bootstrap(DefaultBootstrapConfig)
278 279
}

Jeromy's avatar
Jeromy committed
280 281
func constructConnMgr(cfg config.ConnMgr) (ifconnmgr.ConnManager, error) {
	switch cfg.Type {
282 283 284 285
	case "":
		// 'default' value is the basic connection manager
		return connmgr.NewConnManager(config.DefaultConnMgrLowWater, config.DefaultConnMgrHighWater, config.DefaultConnMgrGracePeriod), nil
	case "none":
Jeromy's avatar
Jeromy committed
286 287 288 289 290 291 292 293 294 295 296 297 298
		return nil, nil
	case "basic":
		grace, err := time.ParseDuration(cfg.GracePeriod)
		if err != nil {
			return nil, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)
		}

		return connmgr.NewConnManager(cfg.LowWater, cfg.HighWater, grace), nil
	default:
		return nil, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Type)
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
299 300 301 302 303 304
func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

305
	var keyProvider rp.KeyChanFunc
Łukasz Magiera's avatar
Łukasz Magiera committed
306 307 308 309 310 311 312 313 314 315 316

	switch cfg.Reprovider.Strategy {
	case "all":
		fallthrough
	case "":
		keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
	case "roots":
		keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
	case "pinned":
		keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
	default:
317
		return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
Łukasz Magiera's avatar
Łukasz Magiera committed
318
	}
319
	n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)
Łukasz Magiera's avatar
Łukasz Magiera committed
320

321 322 323 324 325
	reproviderInterval := kReprovideFrequency
	if cfg.Reprovider.Interval != "" {
		dur, err := time.ParseDuration(cfg.Reprovider.Interval)
		if err != nil {
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
326 327
		}

328
		reproviderInterval = dur
Łukasz Magiera's avatar
Łukasz Magiera committed
329 330
	}

331 332
	go n.Reprovider.Run(reproviderInterval)

Łukasz Magiera's avatar
Łukasz Magiera committed
333 334 335
	return nil
}

336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
	var annAddrs []ma.Multiaddr
	for _, addr := range cfg.Announce {
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}
		annAddrs = append(annAddrs, maddr)
	}

	filters := mafilter.NewFilters()
	noAnnAddrs := map[string]bool{}
	for _, addr := range cfg.NoAnnounce {
		f, err := mamask.NewMask(addr)
		if err == nil {
			filters.AddDialFilter(f)
			continue
		}
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}
		noAnnAddrs[maddr.String()] = true
	}

	return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
		var addrs []ma.Multiaddr
		if len(annAddrs) > 0 {
			addrs = annAddrs
		} else {
			addrs = allAddrs
		}

		var out []ma.Multiaddr
		for _, maddr := range addrs {
			// check for exact matches
			ok, _ := noAnnAddrs[maddr.String()]
			// check for /ipcidr matches
			if !ok && !filters.AddrBlocked(maddr) {
				out = append(out, maddr)
			}
		}
		return out
	}, nil
}

Steven Allen's avatar
Steven Allen committed
382 383 384
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
	const yamuxID = "/yamux/1.0.0"
	const mplexID = "/mplex/6.7.0"
385 386

	ymxtpt := &yamux.Transport{
Steven Allen's avatar
Steven Allen committed
387
		AcceptBacklog:          512,
388 389 390 391 392 393 394
		ConnectionWriteTimeout: time.Second * 10,
		KeepAliveInterval:      time.Second * 30,
		EnableKeepAlive:        true,
		MaxStreamWindowSize:    uint32(1024 * 512),
		LogOutput:              ioutil.Discard,
	}

395 396 397 398
	if os.Getenv("YAMUX_DEBUG") != "" {
		ymxtpt.LogOutput = os.Stderr
	}

Steven Allen's avatar
Steven Allen committed
399
	muxers := map[string]smux.Transport{yamuxID: ymxtpt}
400
	if mplexExp {
Steven Allen's avatar
Steven Allen committed
401
		muxers[mplexID] = mplex.DefaultTransport
402 403 404
	}

	// Allow muxer preference order overriding
Steven Allen's avatar
Steven Allen committed
405
	order := []string{yamuxID, mplexID}
406
	if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
Steven Allen's avatar
Steven Allen committed
407 408 409 410 411 412 413 414 415 416 417 418
		order = strings.Fields(prefs)
	}

	opts := make([]libp2p.Option, 0, len(order))
	for _, id := range order {
		tpt, ok := muxers[id]
		if !ok {
			log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
			continue
		}
		delete(muxers, id)
		opts = append(opts, libp2p.Muxer(id, tpt))
419 420
	}

Steven Allen's avatar
Steven Allen committed
421
	return libp2p.ChainOptions(opts...)
422 423
}

Jeromy's avatar
Jeromy committed
424 425
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
	if d.MDNS.Enabled {
Jeromy's avatar
Jeromy committed
426
		return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) {
Jeromy's avatar
Jeromy committed
427 428 429
			if d.MDNS.Interval == 0 {
				d.MDNS.Interval = 5
			}
Jeromy's avatar
Jeromy committed
430
			return discovery.NewMdnsService(ctx, h, time.Duration(d.MDNS.Interval)*time.Second, discovery.ServiceTag)
Jeromy's avatar
Jeromy committed
431 432 433 434 435
		}
	}
	return nil
}

436 437
// HandlePeerFound attempts to connect to peer from `PeerInfo`, if it fails
// logs a warning log.
Jeromy's avatar
Jeromy committed
438
func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) {
439
	log.Warning("trying peer info: ", p)
440
	ctx, cancel := context.WithTimeout(n.Context(), discoveryConnTimeout)
rht's avatar
rht committed
441
	defer cancel()
442
	if err := n.PeerHost.Connect(ctx, p); err != nil {
443 444 445 446
		log.Warning("Failed to connect to peer found by discovery: ", err)
	}
}

447 448
// startOnlineServicesWithHost  is the set of services which need to be
// initialized with the host and _before_ we start listening.
449
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, pubsub bool, ipnsps bool) error {
450
	// setup diagnostics service
Jeromy's avatar
Jeromy committed
451
	n.Ping = ping.NewPingService(host)
452

453 454 455 456 457 458 459 460 461 462 463 464 465
	if pubsub || ipnsps {
		service, err := floodsub.NewFloodSub(ctx, host)
		if err != nil {
			return err
		}
		n.Floodsub = service
	}

	validator := record.NamespacedValidator{
		"pk":   record.PublicKeyValidator{},
		"ipns": namesys.IpnsValidator{KeyBook: host.Peerstore()},
	}

466
	// setup routing service
467
	r, err := routingOption(ctx, host, n.Repo.Datastore(), validator)
Jeromy's avatar
Jeromy committed
468
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
469
		return err
470
	}
Jeromy's avatar
Jeromy committed
471
	n.Routing = r
472

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
	if ipnsps {
		n.PSRouter = psrouter.NewPubsubValueStore(
			ctx,
			host,
			n.Routing,
			n.Floodsub,
			validator,
		)
		n.Routing = rhelpers.Tiered{
			// Always check pubsub first.
			&rhelpers.Compose{
				ValueStore: &rhelpers.LimitedValueStore{
					ValueStore: n.PSRouter,
					Namespaces: []string{"ipns"},
				},
			},
			n.Routing,
		}
	}

493 494 495
	// Wrap standard peer host with routing system to allow unknown peer lookups
	n.PeerHost = rhost.Wrap(host, n.Routing)

496
	// setup exchange service
497
	bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
Łukasz Magiera's avatar
Łukasz Magiera committed
498
	n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore)
499

500 501 502 503 504
	size, err := n.getCacheSize()
	if err != nil {
		return err
	}

505
	// setup name system
506
	n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
507

Jeromy's avatar
Jeromy committed
508
	// setup ipns republishing
509
	return n.setupIpnsRepublisher()
510 511
}

512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528
// getCacheSize returns cache life and cache size
func (n *IpfsNode) getCacheSize() (int, error) {
	cfg, err := n.Repo.Config()
	if err != nil {
		return 0, err
	}

	cs := cfg.Ipns.ResolveCacheSize
	if cs == 0 {
		cs = 128
	}
	if cs < 0 {
		return 0, fmt.Errorf("cannot specify negative resolve cache size")
	}
	return cs, nil
}

529
func (n *IpfsNode) setupIpnsRepublisher() error {
Jeromy's avatar
Jeromy committed
530 531 532 533
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
534

535
	n.IpnsRepub = ipnsrp.NewRepublisher(n.Namesys, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore())
536

Jeromy's avatar
Jeromy committed
537 538 539 540 541 542
	if cfg.Ipns.RepublishPeriod != "" {
		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
		if err != nil {
			return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)
		}

543
		if !u.Debug && (d < time.Minute || d > (time.Hour*24)) {
Jeromy's avatar
Jeromy committed
544 545 546 547 548 549
			return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)
		}

		n.IpnsRepub.Interval = d
	}

550 551 552 553 554 555 556 557 558
	if cfg.Ipns.RecordLifetime != "" {
		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
		if err != nil {
			return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)
		}

		n.IpnsRepub.RecordLifetime = d
	}

Jeromy's avatar
Jeromy committed
559 560
	n.Process().Go(n.IpnsRepub.Run)

561 562 563
	return nil
}

564 565 566 567 568 569 570 571 572 573 574 575
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
	return n.proc
}

// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
	return n.proc.Close()
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
576 577 578
	if n.ctx == nil {
		n.ctx = context.TODO()
	}
579 580 581
	return n.ctx
}

582 583
// teardown closes owned children. If any errors occur, this function returns
// the first error.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
584
func (n *IpfsNode) teardown() error {
585
	log.Debug("core is shutting down...")
586 587
	// owned objects are closed in this teardown to ensure that they're closed
	// regardless of which constructor was used to add them to the node.
Jeromy's avatar
Jeromy committed
588 589
	var closers []io.Closer

590
	// NOTE: The order that objects are added(closed) matters, if an object
Jeromy's avatar
Jeromy committed
591 592 593 594 595
	// needs to use another during its shutdown/cleanup process, it should be
	// closed before that other object

	if n.FilesRoot != nil {
		closers = append(closers, n.FilesRoot)
Jeromy's avatar
Jeromy committed
596
	}
597

598 599 600 601
	if n.Exchange != nil {
		closers = append(closers, n.Exchange)
	}

602
	if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
603 604
		closers = append(closers, mount.Closer(n.Mounts.Ipfs))
	}
605
	if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
606 607 608
		closers = append(closers, mount.Closer(n.Mounts.Ipns))
	}

Jeromy's avatar
Jeromy committed
609 610 611 612
	if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
		closers = append(closers, dht.Process())
	}

Jeromy's avatar
Jeromy committed
613 614 615 616
	if n.Blocks != nil {
		closers = append(closers, n.Blocks)
	}

Jeromy's avatar
Jeromy committed
617 618
	if n.Bootstrapper != nil {
		closers = append(closers, n.Bootstrapper)
619 620
	}

Jeromy's avatar
Jeromy committed
621 622
	if n.PeerHost != nil {
		closers = append(closers, n.PeerHost)
623
	}
624

Jeromy's avatar
Jeromy committed
625 626 627
	// Repo closed last, most things need to preserve state here
	closers = append(closers, n.Repo)

628
	var errs []error
629
	for _, closer := range closers {
630 631
		if err := closer.Close(); err != nil {
			errs = append(errs, err)
632 633 634 635
		}
	}
	if len(errs) > 0 {
		return errs[0]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
636 637
	}
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
638 639
}

640
// OnlineMode returns whether or not the IpfsNode is in OnlineMode.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
641
func (n *IpfsNode) OnlineMode() bool {
642
	return n.mode == onlineMode
Brian Tiger Chow's avatar
Brian Tiger Chow committed
643 644
}

645
// SetLocal will set the IpfsNode to local mode
646 647 648 649 650 651 652
func (n *IpfsNode) SetLocal(isLocal bool) {
	if isLocal {
		n.mode = localMode
	}
	n.localModeSet = true
}

653
// LocalMode returns whether or not the IpfsNode is in LocalMode
654 655 656 657 658
func (n *IpfsNode) LocalMode() bool {
	if !n.localModeSet {
		// programmer error should not happen
		panic("local mode not set")
	}
659
	return n.mode == localMode
660 661
}

662
// Bootstrap will set and call the IpfsNodes bootstrap function.
663
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
664
	// TODO what should return value be when in offlineMode?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
665 666 667 668
	if n.Routing == nil {
		return nil
	}

669 670 671 672 673 674 675
	if n.Bootstrapper != nil {
		n.Bootstrapper.Close() // stop previous bootstrap process.
	}

	// if the caller did not specify a bootstrap peer function, get the
	// freshest bootstrap peers from config. this responds to live changes.
	if cfg.BootstrapPeers == nil {
Jeromy's avatar
Jeromy committed
676
		cfg.BootstrapPeers = func() []pstore.PeerInfo {
677
			ps, err := n.loadBootstrapPeers()
678
			if err != nil {
679
				log.Warning("failed to parse bootstrap peers from config")
680 681 682 683 684 685 686 687 688
				return nil
			}
			return ps
		}
	}

	var err error
	n.Bootstrapper, err = Bootstrap(n, cfg)
	return err
689 690
}

691 692
func (n *IpfsNode) loadID() error {
	if n.Identity != "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
693
		return errors.New("identity already loaded")
694 695
	}

696 697 698 699 700 701
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	cid := cfg.Identity.PeerID
702
	if cid == "" {
703
		return errors.New("identity was not set in config (was 'ipfs init' run?)")
704 705
	}
	if len(cid) == 0 {
706
		return errors.New("no peer ID in config! (was 'ipfs init' run?)")
707 708
	}

Steven Allen's avatar
Steven Allen committed
709 710 711 712 713 714
	id, err := peer.IDB58Decode(cid)
	if err != nil {
		return fmt.Errorf("peer ID invalid: %s", err)
	}

	n.Identity = id
715 716
	return nil
}
717

718
// GetKey will return a key from the Keystore with name `name`.
719 720 721 722 723 724 725 726
func (n *IpfsNode) GetKey(name string) (ic.PrivKey, error) {
	if name == "self" {
		return n.PrivateKey, nil
	} else {
		return n.Repo.Keystore().Get(name)
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
727
func (n *IpfsNode) LoadPrivateKey() error {
728
	if n.Identity == "" || n.Peerstore == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
729
		return errors.New("loaded private key out of order")
730 731
	}

732
	if n.PrivateKey != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
733
		return errors.New("private key already loaded")
734 735
	}

736 737 738 739 740 741
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	sk, err := loadPrivateKey(&cfg.Identity, n.Identity)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
742
	if err != nil {
743
		return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
744
	}
745

746 747
	n.PrivateKey = sk
	n.Peerstore.AddPrivKey(n.Identity, n.PrivateKey)
Jeromy's avatar
Jeromy committed
748 749 750 751
	n.Peerstore.AddPubKey(n.Identity, sk.GetPublic())
	return nil
}

Jeromy's avatar
Jeromy committed
752
func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) {
753 754 755 756 757 758
	cfg, err := n.Repo.Config()
	if err != nil {
		return nil, err
	}

	parsed, err := cfg.BootstrapPeers()
759 760 761 762 763 764
	if err != nil {
		return nil, err
	}
	return toPeerInfos(parsed), nil
}

Jeromy's avatar
Jeromy committed
765
func (n *IpfsNode) loadFilesRoot() error {
Jeromy's avatar
Jeromy committed
766
	dsk := ds.NewKey("/local/filesroot")
Jeromy's avatar
Jeromy committed
767 768
	pf := func(ctx context.Context, c *cid.Cid) error {
		return n.Repo.Datastore().Put(dsk, c.Bytes())
Jeromy's avatar
Jeromy committed
769 770
	}

771
	var nd *merkledag.ProtoNode
Jeromy's avatar
Jeromy committed
772 773 774 775
	val, err := n.Repo.Datastore().Get(dsk)

	switch {
	case err == ds.ErrNotFound || val == nil:
776
		nd = ft.EmptyDirNode()
777
		err := n.DAG.Add(n.Context(), nd)
Jeromy's avatar
Jeromy committed
778 779 780 781
		if err != nil {
			return fmt.Errorf("failure writing to dagstore: %s", err)
		}
	case err == nil:
Jeromy's avatar
Jeromy committed
782 783 784 785 786
		c, err := cid.Cast(val.([]byte))
		if err != nil {
			return err
		}

787
		rnd, err := n.DAG.Get(n.Context(), c)
Jeromy's avatar
Jeromy committed
788 789 790
		if err != nil {
			return fmt.Errorf("error loading filesroot from DAG: %s", err)
		}
791 792 793 794 795 796 797

		pbnd, ok := rnd.(*merkledag.ProtoNode)
		if !ok {
			return merkledag.ErrNotProtobuf
		}

		nd = pbnd
Jeromy's avatar
Jeromy committed
798 799 800 801 802 803 804 805 806 807 808 809 810
	default:
		return err
	}

	mr, err := mfs.NewRoot(n.Context(), n.DAG, nd, pf)
	if err != nil {
		return err
	}

	n.FilesRoot = mr
	return nil
}

Jeromy's avatar
Jeromy committed
811 812 813
// SetupOfflineRouting loads the local nodes private key and
// uses it to instantiate a routing system in offline mode.
// This is primarily used for offline ipns modifications.
Jeromy's avatar
Jeromy committed
814
func (n *IpfsNode) SetupOfflineRouting() error {
815 816 817 818
	if n.Routing != nil {
		// Routing was already set up
		return nil
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
819
	err := n.LoadPrivateKey()
Jeromy's avatar
Jeromy committed
820 821 822 823 824
	if err != nil {
		return err
	}

	n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey)
825

826 827 828 829 830 831
	size, err := n.getCacheSize()
	if err != nil {
		return err
	}

	n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
832

833
	return nil
834 835 836 837
}

func loadPrivateKey(cfg *config.Identity, id peer.ID) (ic.PrivKey, error) {
	sk, err := cfg.DecodePrivateKey("passphrase todo!")
838 839 840
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
841

842 843 844 845
	id2, err := peer.IDFromPrivateKey(sk)
	if err != nil {
		return nil, err
	}
846

847 848
	if id2 != id {
		return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
849 850
	}

851
	return sk, nil
852
}
853

854
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
855 856 857
	var listen []ma.Multiaddr
	for _, addr := range cfg.Addresses.Swarm {
		maddr, err := ma.NewMultiaddr(addr)
858
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
859
			return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
860
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
861
		listen = append(listen, maddr)
862 863 864 865
	}

	return listen, nil
}
866

Kevin Atkinson's avatar
Kevin Atkinson committed
867
type ConstructPeerHostOpts struct {
868
	AddrsFactory      p2pbhost.AddrsFactory
vyzo's avatar
vyzo committed
869 870 871
	DisableNatPortMap bool
	DisableRelay      bool
	EnableRelayHop    bool
Jeromy's avatar
Jeromy committed
872
	ConnectionManager ifconnmgr.ConnManager
Kevin Atkinson's avatar
Kevin Atkinson committed
873 874
}

Steven Allen's avatar
Steven Allen committed
875
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error)
Jeromy's avatar
Jeromy committed
876 877 878

var DefaultHostOption HostOption = constructPeerHost

879
// isolates the complex initialization steps
Steven Allen's avatar
Steven Allen committed
880 881 882 883
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
	pkey := ps.PrivKey(id)
	if pkey == nil {
		return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
884
	}
Steven Allen's avatar
Steven Allen committed
885 886
	options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
	return libp2p.New(ctx, options...)
887 888
}

889 890 891 892 893 894 895 896 897 898 899 900
func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
	var raddrs []ma.Multiaddr
	for _, addr := range addrs {
		_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
		if err == nil {
			continue
		}
		raddrs = append(raddrs, addr)
	}
	return raddrs
}

901 902 903 904 905 906
func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory {
	return func(addrs []ma.Multiaddr) []ma.Multiaddr {
		return f(g(addrs))
	}
}

907
// startListening on the network addresses
Łukasz Magiera's avatar
Łukasz Magiera committed
908
func startListening(host p2phost.Host, cfg *config.Config) error {
909 910
	listenAddrs, err := listenAddresses(cfg)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
911
		return err
912 913 914
	}

	// Actually start listening:
Steven Allen's avatar
Steven Allen committed
915
	if err := host.Network().Listen(listenAddrs...); err != nil {
916
		return err
917 918
	}

919
	// list out our addresses
920
	addrs, err := host.Network().InterfaceListenAddresses()
921
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
922
		return err
923
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
924
	log.Infof("Swarm listening at: %s", addrs)
925
	return nil
926
}
927

928 929 930 931 932 933
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
	return dht.New(
		ctx, host,
		dhtopts.Datastore(dstore),
		dhtopts.Validator(validator),
	)
934
}
Jeromy's avatar
Jeromy committed
935

936 937 938 939 940 941 942
func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
	return dht.New(
		ctx, host,
		dhtopts.Client(true),
		dhtopts.Datastore(dstore),
		dhtopts.Validator(validator),
	)
Jeromy's avatar
Jeromy committed
943 944
}

945
type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error)
Jeromy's avatar
Jeromy committed
946

Jeromy's avatar
Jeromy committed
947
type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error)
Jeromy's avatar
Jeromy committed
948

949
var DHTOption RoutingOption = constructDHTRouting
Jeromy's avatar
Jeromy committed
950
var DHTClientOption RoutingOption = constructClientDHTRouting
951
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting