core.go 26.6 KB
Newer Older
1 2 3 4
/*
Package core implements the IpfsNode object and related methods.

Packages underneath core/ provide a (relatively) stable, low-level API
5 6 7 8
to carry out most IPFS-related tasks.  For more details on the other
interfaces and how core/... fits into the bigger IPFS picture, see:

  $ godoc github.com/ipfs/go-ipfs
9
*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
package core

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
import (
Jakub Sztandera's avatar
Jakub Sztandera committed
13
	"bytes"
14
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	"errors"
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
16
	"fmt"
17
	"io"
18 19 20
	"io/ioutil"
	"os"
	"strings"
21
	"time"
22

23
	version "github.com/ipfs/go-ipfs"
24
	rp "github.com/ipfs/go-ipfs/exchange/reprovide"
25
	filestore "github.com/ipfs/go-ipfs/filestore"
26 27 28 29
	mount "github.com/ipfs/go-ipfs/fuse/mount"
	mfs "github.com/ipfs/go-ipfs/mfs"
	namesys "github.com/ipfs/go-ipfs/namesys"
	ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
Łukasz Magiera's avatar
Łukasz Magiera committed
30
	p2p "github.com/ipfs/go-ipfs/p2p"
31 32
	pin "github.com/ipfs/go-ipfs/pin"
	repo "github.com/ipfs/go-ipfs/repo"
33

Steven Allen's avatar
Steven Allen committed
34 35 36
	bitswap "gx/ipfs/QmNQQEYL3Vpj4beteqyeRpVpivuX1wBP6Q5GZMdBPPTV3S/go-bitswap"
	bsnet "gx/ipfs/QmNQQEYL3Vpj4beteqyeRpVpivuX1wBP6Q5GZMdBPPTV3S/go-bitswap/network"
	circuit "gx/ipfs/QmPMRK5yTc2KhnaxQN4R7vRqEfZo5hW1aF5x6W97RKnXZq/go-libp2p-circuit"
37
	u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util"
Łukasz Magiera's avatar
Łukasz Magiera committed
38
	"gx/ipfs/QmPqCBrmkm7jNfYi7xFS7mUZsrN6DEumBMrxLnL7axNJx1/go-path/resolver"
Steven Allen's avatar
Steven Allen committed
39 40 41 42 43
	ic "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
	p2phost "gx/ipfs/QmQ1hwb95uSSZR8jSPJysnfHxBDQAykSXsmz5TwTzxjq2Z/go-libp2p-host"
	bstore "gx/ipfs/QmRNFh4wm6FgTDrtsWmnvEP9NTuEa3Ykf72y1LXCyevbGW/go-ipfs-blockstore"
	logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
	floodsub "gx/ipfs/QmRXefkwjreRT6XfYh3Ag4hsVnWBbpcUicGJPcg8TWbhBK/go-libp2p-floodsub"
Łukasz Magiera's avatar
Łukasz Magiera committed
44
	config "gx/ipfs/QmRwCaRYotCqXsVZAXwWhEJ8A74iAaKnY7MUe6sDgFjrE5/go-ipfs-config"
45
	goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
Jeromy's avatar
Jeromy committed
46
	mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
Steven Allen's avatar
Steven Allen committed
47
	mafilter "gx/ipfs/QmSW4uNHbvQia8iZDXzbwjiyHQtnyo9aFqfQAMasj3TJ6Y/go-maddr-filter"
Steven Allen's avatar
Steven Allen committed
48 49 50 51 52 53 54 55 56
	libp2p "gx/ipfs/QmUDzeFgYrRmHL2hUB6NZmqcBVQtUzETwmFRUc9onfSSHr/go-libp2p"
	discovery "gx/ipfs/QmUDzeFgYrRmHL2hUB6NZmqcBVQtUzETwmFRUc9onfSSHr/go-libp2p/p2p/discovery"
	p2pbhost "gx/ipfs/QmUDzeFgYrRmHL2hUB6NZmqcBVQtUzETwmFRUc9onfSSHr/go-libp2p/p2p/host/basic"
	rhost "gx/ipfs/QmUDzeFgYrRmHL2hUB6NZmqcBVQtUzETwmFRUc9onfSSHr/go-libp2p/p2p/host/routed"
	identify "gx/ipfs/QmUDzeFgYrRmHL2hUB6NZmqcBVQtUzETwmFRUc9onfSSHr/go-libp2p/p2p/protocol/identify"
	ping "gx/ipfs/QmUDzeFgYrRmHL2hUB6NZmqcBVQtUzETwmFRUc9onfSSHr/go-libp2p/p2p/protocol/ping"
	record "gx/ipfs/QmUTQSGgjs8CHm9yBcUHicpRs7C9abhyZiBwjzCUp1pNgX/go-libp2p-record"
	rhelpers "gx/ipfs/QmVgeNSg3ycA5BEKXEx5Pgy73iwVDEa8pQ5GKWrXUr4YED/go-libp2p-routing-helpers"
	metrics "gx/ipfs/QmWne2EKHBvVpSTYuWuWch3D9KqAx78Te83UXWFKQDcksJ/go-libp2p-metrics"
Łukasz Magiera's avatar
Łukasz Magiera committed
57
	merkledag "gx/ipfs/QmXkZeJmx4c3ddjw81DQMUpM1e5LjAack5idzZYWUb2qAJ/go-merkledag"
Steven Allen's avatar
Steven Allen committed
58
	connmgr "gx/ipfs/QmY6ujWdgPoEnYPCTNYBBGD6gAj9fPfRZsDgKm9awpM1Tv/go-libp2p-connmgr"
Steven Allen's avatar
Steven Allen committed
59
	smux "gx/ipfs/QmY9JXR3FupnYAYJWK9aMr9bCpqWKcToQ1tz8DVGTrHpHw/go-stream-muxer"
Steven Allen's avatar
Steven Allen committed
60
	pstore "gx/ipfs/QmYLXCWN2myozZpx8Wx4UjrRuQuhY3YtWoMi6SHaXii6aM/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
61
	cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
Steven Allen's avatar
Steven Allen committed
62
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
Steven Allen's avatar
Steven Allen committed
63 64 65 66
	dht "gx/ipfs/QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC/go-libp2p-kad-dht"
	dhtopts "gx/ipfs/QmZAsayEQakfFbHyakgHRKHwBTWrwuSBTfaMyxJZUG97VC/go-libp2p-kad-dht/opts"
	psrouter "gx/ipfs/QmZXeeaAaSUCCizUfvZHK4BM8po7xJHFGodfYAQuDfHvwt/go-libp2p-pubsub-router"
	pnet "gx/ipfs/QmZaQ3K9PRd5sYYoG1xbTGPtd3N7TYiKBRmcBUTsx8HVET/go-libp2p-pnet"
Steven Allen's avatar
Steven Allen committed
67
	ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
Steven Allen's avatar
Steven Allen committed
68 69
	nilrouting "gx/ipfs/Qma19TdQ7W26jbfuPgdo9Zi4qtjks1zeXzX86mtEYWYCiw/go-ipfs-routing/none"
	offroute "gx/ipfs/Qma19TdQ7W26jbfuPgdo9Zi4qtjks1zeXzX86mtEYWYCiw/go-ipfs-routing/offline"
Steven Allen's avatar
Steven Allen committed
70
	exchange "gx/ipfs/Qmc2faLf7URkHpsbfYM4EMbr8iSAcGAe8VPgVi64HVnwji/go-ipfs-exchange-interface"
Steven Allen's avatar
Steven Allen committed
71
	peer "gx/ipfs/QmcZSzKEM5yDfpZbeEEZaVmaZ1zXm6JWTbrQZSB8hCVPzk/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
72
	yamux "gx/ipfs/QmcsgrV3nCAKjiHKZhKVXWc4oY3WBECJCqahXEMpHeMrev/go-smux-yamux"
Steven Allen's avatar
Steven Allen committed
73
	mplex "gx/ipfs/QmdiBZzwGtN2yHJrWD9ojQ7ASS48nv7BcojWLkYd1ZtrV2/go-smux-multiplex"
74
	ft "gx/ipfs/Qmdqe1sKBpz6W8xFDptGfmzgCPQ5CXNuQPhZeELqMowgsQ/go-unixfs"
Steven Allen's avatar
Steven Allen committed
75
	ifconnmgr "gx/ipfs/QmeJbAMK4cZc1RMChb68h9t2jqvK8miqE8oQiwGAf4EdQq/go-libp2p-interface-connmgr"
Łukasz Magiera's avatar
Łukasz Magiera committed
76
	bserv "gx/ipfs/QmeZMtdkNG7u2CohGSL8mzAdZY2c3B1coYE91wvbzip1pF/go-blockservice"
Steven Allen's avatar
Steven Allen committed
77
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
Steven Allen's avatar
Steven Allen committed
78
	routing "gx/ipfs/QmewrvpGvgK9qkCtXsGNwXiQzyux4jcHNjoyVrGdsgtNK5/go-libp2p-routing"
Łukasz Magiera's avatar
Łukasz Magiera committed
79
)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80

Jeromy's avatar
Jeromy committed
81
const IpnsValidatorTag = "ipns"
82

83
const kReprovideFrequency = time.Hour * 12
84
const discoveryConnTimeout = time.Second * 30
Jeromy's avatar
Jeromy committed
85

Jeromy's avatar
Jeromy committed
86
var log = logging.Logger("core")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87

88 89 90 91
type mode int

const (
	// zero value is not a valid mode, must be explicitly set
92
	localMode mode = iota
93 94 95 96
	offlineMode
	onlineMode
)

97
func init() {
98
	identify.ClientVersion = "go-ipfs/" + version.CurrentVersionNumber + "/" + version.CurrentCommit
99 100
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
101
// IpfsNode is IPFS Core module. It represents an IPFS instance.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103
type IpfsNode struct {

104
	// Self
105
	Identity peer.ID // the local node's identity
106

107
	Repo repo.Repo
108 109

	// Local node
110 111 112 113
	Pinning         pin.Pinner // the pinning manager
	Mounts          Mounts     // current mount state, if any.
	PrivateKey      ic.PrivKey // the local node's private Key
	PNetFingerprint []byte     // fingerprint of private network
114 115

	// Services
116 117 118 119 120 121 122 123 124 125 126 127
	Peerstore       pstore.Peerstore     // storage for other Peer instances
	Blockstore      bstore.GCBlockstore  // the block store (lower level)
	Filestore       *filestore.Filestore // the filestore blockstore
	BaseBlocks      bstore.Blockstore    // the raw blockstore, no filestore wrapping
	GCLocker        bstore.GCLocker      // the locker used to protect the blockstore during gc
	Blocks          bserv.BlockService   // the block service, get/add blocks.
	DAG             ipld.DAGService      // the merkle dag service, get/add objects.
	Resolver        *resolver.Resolver   // the path resolution system
	Reporter        metrics.Reporter
	Discovery       discovery.Service
	FilesRoot       *mfs.Root
	RecordValidator record.Validator
128 129

	// Online
130 131 132 133 134
	PeerHost     p2phost.Host        // the network host (server+client)
	Bootstrapper io.Closer           // the periodic bootstrapper
	Routing      routing.IpfsRouting // the routing system. recommend ipfs-dht
	Exchange     exchange.Interface  // the block exchange + strategy (bitswap)
	Namesys      namesys.NameSystem  // the name system, resolves paths to hashes
Jeromy's avatar
Jeromy committed
135 136
	Ping         *ping.PingService
	Reprovider   *rp.Reprovider // the value reprovider system
Jeromy's avatar
Jeromy committed
137
	IpnsRepub    *ipnsrp.Republisher
138

Jeromy's avatar
Jeromy committed
139
	Floodsub *floodsub.PubSub
140
	PSRouter *psrouter.PubsubValueStore
141
	DHT      *dht.IpfsDHT
Łukasz Magiera's avatar
Łukasz Magiera committed
142
	P2P      *p2p.P2P
Jeromy's avatar
Jeromy committed
143

144 145
	proc goprocess.Process
	ctx  context.Context
146

Jeromy's avatar
Jeromy committed
147
	mode         mode
148
	localModeSet bool
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150
}

151 152 153 154 155 156 157 158
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
	Ipfs mount.Mount
	Ipns mount.Mount
}

vyzo's avatar
vyzo committed
159
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {
160
	if n.PeerHost != nil { // already online.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
		return errors.New("node already online")
162 163 164
	}

	// load private key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	if err := n.LoadPrivateKey(); err != nil {
166 167 168
		return err
	}

Jeromy's avatar
Jeromy committed
169
	// get undialable addrs from config
170 171 172 173
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
174 175

	var libp2pOpts []libp2p.Option
176
	for _, s := range cfg.Swarm.AddrFilters {
Jeromy's avatar
Jeromy committed
177 178
		f, err := mamask.NewMask(s)
		if err != nil {
179
			return fmt.Errorf("incorrectly formatted address filter in config: %s", s)
Jeromy's avatar
Jeromy committed
180
		}
Steven Allen's avatar
Steven Allen committed
181
		libp2pOpts = append(libp2pOpts, libp2p.FilterAddresses(f))
Jeromy's avatar
Jeromy committed
182 183
	}

184 185 186
	if !cfg.Swarm.DisableBandwidthMetrics {
		// Set reporter
		n.Reporter = metrics.NewBandwidthCounter()
Steven Allen's avatar
Steven Allen committed
187
		libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(n.Reporter))
188 189
	}

Jakub Sztandera's avatar
Jakub Sztandera committed
190 191 192 193 194 195
	swarmkey, err := n.Repo.SwarmKey()
	if err != nil {
		return err
	}

	if swarmkey != nil {
Steven Allen's avatar
Steven Allen committed
196
		protec, err := pnet.NewProtector(bytes.NewReader(swarmkey))
Jakub Sztandera's avatar
Jakub Sztandera committed
197
		if err != nil {
198
			return fmt.Errorf("failed to configure private network: %s", err)
Jakub Sztandera's avatar
Jakub Sztandera committed
199
		}
200
		n.PNetFingerprint = protec.Fingerprint()
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
		go func() {
			t := time.NewTicker(30 * time.Second)
			<-t.C // swallow one tick
			for {
				select {
				case <-t.C:
					if ph := n.PeerHost; ph != nil {
						if len(ph.Network().Peers()) == 0 {
							log.Warning("We are in private network and have no peers.")
							log.Warning("This might be configuration mistake.")
						}
					}
				case <-n.Process().Closing():
					t.Stop()
					return
				}
			}
		}()
Steven Allen's avatar
Steven Allen committed
219 220

		libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protec))
Jakub Sztandera's avatar
Jakub Sztandera committed
221 222
	}

223 224 225 226
	addrsFactory, err := makeAddrsFactory(cfg.Addresses)
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
227 228 229 230
	if !cfg.Swarm.DisableRelay {
		addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs)
	}
	libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrsFactory))
231

Steven Allen's avatar
Steven Allen committed
232
	connm, err := constructConnMgr(cfg.Swarm.ConnMgr)
Jeromy's avatar
Jeromy committed
233 234 235
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
236 237 238
	libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connm))

	libp2pOpts = append(libp2pOpts, makeSmuxTransportOption(mplex))
Jeromy's avatar
Jeromy committed
239

Steven Allen's avatar
Steven Allen committed
240 241
	if !cfg.Swarm.DisableNatPortMap {
		libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
242
	}
Steven Allen's avatar
Steven Allen committed
243 244 245 246 247 248 249 250 251
	if !cfg.Swarm.DisableRelay {
		var opts []circuit.RelayOpt
		if cfg.Swarm.EnableRelayHop {
			opts = append(opts, circuit.OptHop)
		}
		libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(opts...))
	}

	peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, libp2pOpts...)
vyzo's avatar
vyzo committed
252

253
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254
		return err
255 256
	}

257
	if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption, pubsub, ipnsps); err != nil {
258
		return err
259 260 261
	}

	// Ok, now we're ready to listen.
Łukasz Magiera's avatar
Łukasz Magiera committed
262
	if err := startListening(n.PeerHost, cfg); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263
		return err
264
	}
265

Łukasz Magiera's avatar
Łukasz Magiera committed
266
	n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)
267

268
	// setup local discovery
Jeromy's avatar
Jeromy committed
269
	if do != nil {
Jeromy's avatar
Jeromy committed
270
		service, err := do(ctx, n.PeerHost)
Jeromy's avatar
Jeromy committed
271
		if err != nil {
Jeromy's avatar
Jeromy committed
272 273 274 275
			log.Error("mdns error: ", err)
		} else {
			service.RegisterNotifee(n)
			n.Discovery = service
Jeromy's avatar
Jeromy committed
276
		}
277 278
	}

279
	return n.Bootstrap(DefaultBootstrapConfig)
280 281
}

Jeromy's avatar
Jeromy committed
282 283
func constructConnMgr(cfg config.ConnMgr) (ifconnmgr.ConnManager, error) {
	switch cfg.Type {
284 285 286 287
	case "":
		// 'default' value is the basic connection manager
		return connmgr.NewConnManager(config.DefaultConnMgrLowWater, config.DefaultConnMgrHighWater, config.DefaultConnMgrGracePeriod), nil
	case "none":
Jeromy's avatar
Jeromy committed
288 289 290 291 292 293 294 295 296 297 298 299 300
		return nil, nil
	case "basic":
		grace, err := time.ParseDuration(cfg.GracePeriod)
		if err != nil {
			return nil, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)
		}

		return connmgr.NewConnManager(cfg.LowWater, cfg.HighWater, grace), nil
	default:
		return nil, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Type)
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
301 302 303 304 305 306
func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

307
	var keyProvider rp.KeyChanFunc
Łukasz Magiera's avatar
Łukasz Magiera committed
308 309 310 311 312 313 314 315 316 317 318

	switch cfg.Reprovider.Strategy {
	case "all":
		fallthrough
	case "":
		keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
	case "roots":
		keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
	case "pinned":
		keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
	default:
319
		return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
Łukasz Magiera's avatar
Łukasz Magiera committed
320
	}
321
	n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)
Łukasz Magiera's avatar
Łukasz Magiera committed
322

323 324 325 326 327
	reproviderInterval := kReprovideFrequency
	if cfg.Reprovider.Interval != "" {
		dur, err := time.ParseDuration(cfg.Reprovider.Interval)
		if err != nil {
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
328 329
		}

330
		reproviderInterval = dur
Łukasz Magiera's avatar
Łukasz Magiera committed
331 332
	}

333 334
	go n.Reprovider.Run(reproviderInterval)

Łukasz Magiera's avatar
Łukasz Magiera committed
335 336 337
	return nil
}

338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
	var annAddrs []ma.Multiaddr
	for _, addr := range cfg.Announce {
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}
		annAddrs = append(annAddrs, maddr)
	}

	filters := mafilter.NewFilters()
	noAnnAddrs := map[string]bool{}
	for _, addr := range cfg.NoAnnounce {
		f, err := mamask.NewMask(addr)
		if err == nil {
			filters.AddDialFilter(f)
			continue
		}
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}
		noAnnAddrs[maddr.String()] = true
	}

	return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
		var addrs []ma.Multiaddr
		if len(annAddrs) > 0 {
			addrs = annAddrs
		} else {
			addrs = allAddrs
		}

		var out []ma.Multiaddr
		for _, maddr := range addrs {
			// check for exact matches
			ok, _ := noAnnAddrs[maddr.String()]
			// check for /ipcidr matches
			if !ok && !filters.AddrBlocked(maddr) {
				out = append(out, maddr)
			}
		}
		return out
	}, nil
}

Steven Allen's avatar
Steven Allen committed
384 385 386
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
	const yamuxID = "/yamux/1.0.0"
	const mplexID = "/mplex/6.7.0"
387 388

	ymxtpt := &yamux.Transport{
Steven Allen's avatar
Steven Allen committed
389
		AcceptBacklog:          512,
390 391 392 393 394 395 396
		ConnectionWriteTimeout: time.Second * 10,
		KeepAliveInterval:      time.Second * 30,
		EnableKeepAlive:        true,
		MaxStreamWindowSize:    uint32(1024 * 512),
		LogOutput:              ioutil.Discard,
	}

397 398 399 400
	if os.Getenv("YAMUX_DEBUG") != "" {
		ymxtpt.LogOutput = os.Stderr
	}

Steven Allen's avatar
Steven Allen committed
401
	muxers := map[string]smux.Transport{yamuxID: ymxtpt}
402
	if mplexExp {
Steven Allen's avatar
Steven Allen committed
403
		muxers[mplexID] = mplex.DefaultTransport
404 405 406
	}

	// Allow muxer preference order overriding
Steven Allen's avatar
Steven Allen committed
407
	order := []string{yamuxID, mplexID}
408
	if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
Steven Allen's avatar
Steven Allen committed
409 410 411 412 413 414 415 416 417 418 419 420
		order = strings.Fields(prefs)
	}

	opts := make([]libp2p.Option, 0, len(order))
	for _, id := range order {
		tpt, ok := muxers[id]
		if !ok {
			log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
			continue
		}
		delete(muxers, id)
		opts = append(opts, libp2p.Muxer(id, tpt))
421 422
	}

Steven Allen's avatar
Steven Allen committed
423
	return libp2p.ChainOptions(opts...)
424 425
}

Jeromy's avatar
Jeromy committed
426 427
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
	if d.MDNS.Enabled {
Jeromy's avatar
Jeromy committed
428
		return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) {
Jeromy's avatar
Jeromy committed
429 430 431
			if d.MDNS.Interval == 0 {
				d.MDNS.Interval = 5
			}
Jeromy's avatar
Jeromy committed
432
			return discovery.NewMdnsService(ctx, h, time.Duration(d.MDNS.Interval)*time.Second, discovery.ServiceTag)
Jeromy's avatar
Jeromy committed
433 434 435 436 437
		}
	}
	return nil
}

438 439
// HandlePeerFound attempts to connect to peer from `PeerInfo`, if it fails
// logs a warning log.
Jeromy's avatar
Jeromy committed
440
func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) {
441
	log.Warning("trying peer info: ", p)
442
	ctx, cancel := context.WithTimeout(n.Context(), discoveryConnTimeout)
rht's avatar
rht committed
443
	defer cancel()
444
	if err := n.PeerHost.Connect(ctx, p); err != nil {
445 446 447 448
		log.Warning("Failed to connect to peer found by discovery: ", err)
	}
}

449 450
// startOnlineServicesWithHost  is the set of services which need to be
// initialized with the host and _before_ we start listening.
451
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, pubsub bool, ipnsps bool) error {
452
	// setup diagnostics service
Jeromy's avatar
Jeromy committed
453
	n.Ping = ping.NewPingService(host)
454

455 456 457 458 459 460 461 462
	if pubsub || ipnsps {
		service, err := floodsub.NewFloodSub(ctx, host)
		if err != nil {
			return err
		}
		n.Floodsub = service
	}

463
	// setup routing service
464
	r, err := routingOption(ctx, host, n.Repo.Datastore(), n.RecordValidator)
Jeromy's avatar
Jeromy committed
465
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466
		return err
467
	}
Jeromy's avatar
Jeromy committed
468
	n.Routing = r
469

470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
	// TODO: I'm not a fan of type assertions like this but the
	// `RoutingOption` system doesn't currently provide access to the
	// IpfsNode.
	//
	// Ideally, we'd do something like:
	//
	// 1. Add some fancy method to introspect into tiered routers to extract
	//    things like the pubsub router or the DHT (complicated, messy,
	//    probably not worth it).
	// 2. Pass the IpfsNode into the RoutingOption (would also remove the
	//    PSRouter case below.
	// 3. Introduce some kind of service manager? (my personal favorite but
	//    that requires a fair amount of work).
	if dht, ok := r.(*dht.IpfsDHT); ok {
		n.DHT = dht
	}

487 488 489 490 491 492
	if ipnsps {
		n.PSRouter = psrouter.NewPubsubValueStore(
			ctx,
			host,
			n.Routing,
			n.Floodsub,
493
			n.RecordValidator,
494 495 496 497 498 499 500 501 502 503 504 505 506
		)
		n.Routing = rhelpers.Tiered{
			// Always check pubsub first.
			&rhelpers.Compose{
				ValueStore: &rhelpers.LimitedValueStore{
					ValueStore: n.PSRouter,
					Namespaces: []string{"ipns"},
				},
			},
			n.Routing,
		}
	}

507 508 509
	// Wrap standard peer host with routing system to allow unknown peer lookups
	n.PeerHost = rhost.Wrap(host, n.Routing)

510
	// setup exchange service
511
	bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
Łukasz Magiera's avatar
Łukasz Magiera committed
512
	n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore)
513

514 515 516 517 518
	size, err := n.getCacheSize()
	if err != nil {
		return err
	}

519
	// setup name system
520
	n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
521

Jeromy's avatar
Jeromy committed
522
	// setup ipns republishing
523
	return n.setupIpnsRepublisher()
524 525
}

526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
// getCacheSize returns cache life and cache size
func (n *IpfsNode) getCacheSize() (int, error) {
	cfg, err := n.Repo.Config()
	if err != nil {
		return 0, err
	}

	cs := cfg.Ipns.ResolveCacheSize
	if cs == 0 {
		cs = 128
	}
	if cs < 0 {
		return 0, fmt.Errorf("cannot specify negative resolve cache size")
	}
	return cs, nil
}

543
func (n *IpfsNode) setupIpnsRepublisher() error {
Jeromy's avatar
Jeromy committed
544 545 546 547
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
548

549
	n.IpnsRepub = ipnsrp.NewRepublisher(n.Namesys, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore())
550

Jeromy's avatar
Jeromy committed
551 552 553 554 555 556
	if cfg.Ipns.RepublishPeriod != "" {
		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
		if err != nil {
			return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)
		}

557
		if !u.Debug && (d < time.Minute || d > (time.Hour*24)) {
Jeromy's avatar
Jeromy committed
558 559 560 561 562 563
			return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)
		}

		n.IpnsRepub.Interval = d
	}

564 565 566 567 568 569 570 571 572
	if cfg.Ipns.RecordLifetime != "" {
		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
		if err != nil {
			return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)
		}

		n.IpnsRepub.RecordLifetime = d
	}

Jeromy's avatar
Jeromy committed
573 574
	n.Process().Go(n.IpnsRepub.Run)

575 576 577
	return nil
}

578 579 580 581 582 583 584 585 586 587 588 589
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
	return n.proc
}

// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
	return n.proc.Close()
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
590 591 592
	if n.ctx == nil {
		n.ctx = context.TODO()
	}
593 594 595
	return n.ctx
}

596 597
// teardown closes owned children. If any errors occur, this function returns
// the first error.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
598
func (n *IpfsNode) teardown() error {
599
	log.Debug("core is shutting down...")
600 601
	// owned objects are closed in this teardown to ensure that they're closed
	// regardless of which constructor was used to add them to the node.
Jeromy's avatar
Jeromy committed
602 603
	var closers []io.Closer

604
	// NOTE: The order that objects are added(closed) matters, if an object
Jeromy's avatar
Jeromy committed
605 606 607 608 609
	// needs to use another during its shutdown/cleanup process, it should be
	// closed before that other object

	if n.FilesRoot != nil {
		closers = append(closers, n.FilesRoot)
Jeromy's avatar
Jeromy committed
610
	}
611

612 613 614 615
	if n.Exchange != nil {
		closers = append(closers, n.Exchange)
	}

616
	if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
617 618
		closers = append(closers, mount.Closer(n.Mounts.Ipfs))
	}
619
	if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
620 621 622
		closers = append(closers, mount.Closer(n.Mounts.Ipns))
	}

623 624
	if n.DHT != nil {
		closers = append(closers, n.DHT.Process())
Jeromy's avatar
Jeromy committed
625 626
	}

Jeromy's avatar
Jeromy committed
627 628 629 630
	if n.Blocks != nil {
		closers = append(closers, n.Blocks)
	}

Jeromy's avatar
Jeromy committed
631 632
	if n.Bootstrapper != nil {
		closers = append(closers, n.Bootstrapper)
633 634
	}

Jeromy's avatar
Jeromy committed
635 636
	if n.PeerHost != nil {
		closers = append(closers, n.PeerHost)
637
	}
638

Jeromy's avatar
Jeromy committed
639 640 641
	// Repo closed last, most things need to preserve state here
	closers = append(closers, n.Repo)

642
	var errs []error
643
	for _, closer := range closers {
644 645
		if err := closer.Close(); err != nil {
			errs = append(errs, err)
646 647 648 649
		}
	}
	if len(errs) > 0 {
		return errs[0]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
650 651
	}
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
652 653
}

654
// OnlineMode returns whether or not the IpfsNode is in OnlineMode.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
655
func (n *IpfsNode) OnlineMode() bool {
656
	return n.mode == onlineMode
Brian Tiger Chow's avatar
Brian Tiger Chow committed
657 658
}

659
// SetLocal will set the IpfsNode to local mode
660 661 662 663 664 665 666
func (n *IpfsNode) SetLocal(isLocal bool) {
	if isLocal {
		n.mode = localMode
	}
	n.localModeSet = true
}

667
// LocalMode returns whether or not the IpfsNode is in LocalMode
668 669 670 671 672
func (n *IpfsNode) LocalMode() bool {
	if !n.localModeSet {
		// programmer error should not happen
		panic("local mode not set")
	}
673
	return n.mode == localMode
674 675
}

676
// Bootstrap will set and call the IpfsNodes bootstrap function.
677
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
678
	// TODO what should return value be when in offlineMode?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
679 680 681 682
	if n.Routing == nil {
		return nil
	}

683 684 685 686 687 688 689
	if n.Bootstrapper != nil {
		n.Bootstrapper.Close() // stop previous bootstrap process.
	}

	// if the caller did not specify a bootstrap peer function, get the
	// freshest bootstrap peers from config. this responds to live changes.
	if cfg.BootstrapPeers == nil {
Jeromy's avatar
Jeromy committed
690
		cfg.BootstrapPeers = func() []pstore.PeerInfo {
691
			ps, err := n.loadBootstrapPeers()
692
			if err != nil {
693
				log.Warning("failed to parse bootstrap peers from config")
694 695 696 697 698 699 700 701 702
				return nil
			}
			return ps
		}
	}

	var err error
	n.Bootstrapper, err = Bootstrap(n, cfg)
	return err
703 704
}

705 706
func (n *IpfsNode) loadID() error {
	if n.Identity != "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
707
		return errors.New("identity already loaded")
708 709
	}

710 711 712 713 714 715
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	cid := cfg.Identity.PeerID
716
	if cid == "" {
717
		return errors.New("identity was not set in config (was 'ipfs init' run?)")
718 719
	}
	if len(cid) == 0 {
720
		return errors.New("no peer ID in config! (was 'ipfs init' run?)")
721 722
	}

Steven Allen's avatar
Steven Allen committed
723 724 725 726 727 728
	id, err := peer.IDB58Decode(cid)
	if err != nil {
		return fmt.Errorf("peer ID invalid: %s", err)
	}

	n.Identity = id
729 730
	return nil
}
731

732
// GetKey will return a key from the Keystore with name `name`.
733 734 735 736 737 738 739 740
func (n *IpfsNode) GetKey(name string) (ic.PrivKey, error) {
	if name == "self" {
		return n.PrivateKey, nil
	} else {
		return n.Repo.Keystore().Get(name)
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
741
func (n *IpfsNode) LoadPrivateKey() error {
742
	if n.Identity == "" || n.Peerstore == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
743
		return errors.New("loaded private key out of order")
744 745
	}

746
	if n.PrivateKey != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
747
		return errors.New("private key already loaded")
748 749
	}

750 751 752 753 754 755
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	sk, err := loadPrivateKey(&cfg.Identity, n.Identity)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
756
	if err != nil {
757
		return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
758
	}
759

760 761
	n.PrivateKey = sk
	n.Peerstore.AddPrivKey(n.Identity, n.PrivateKey)
Jeromy's avatar
Jeromy committed
762 763 764 765
	n.Peerstore.AddPubKey(n.Identity, sk.GetPublic())
	return nil
}

Jeromy's avatar
Jeromy committed
766
func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) {
767 768 769 770 771 772
	cfg, err := n.Repo.Config()
	if err != nil {
		return nil, err
	}

	parsed, err := cfg.BootstrapPeers()
773 774 775 776 777 778
	if err != nil {
		return nil, err
	}
	return toPeerInfos(parsed), nil
}

Jeromy's avatar
Jeromy committed
779
func (n *IpfsNode) loadFilesRoot() error {
Jeromy's avatar
Jeromy committed
780
	dsk := ds.NewKey("/local/filesroot")
Jeromy's avatar
Jeromy committed
781 782
	pf := func(ctx context.Context, c *cid.Cid) error {
		return n.Repo.Datastore().Put(dsk, c.Bytes())
Jeromy's avatar
Jeromy committed
783 784
	}

785
	var nd *merkledag.ProtoNode
Jeromy's avatar
Jeromy committed
786 787 788 789
	val, err := n.Repo.Datastore().Get(dsk)

	switch {
	case err == ds.ErrNotFound || val == nil:
790
		nd = ft.EmptyDirNode()
791
		err := n.DAG.Add(n.Context(), nd)
Jeromy's avatar
Jeromy committed
792 793 794 795
		if err != nil {
			return fmt.Errorf("failure writing to dagstore: %s", err)
		}
	case err == nil:
Jeromy's avatar
Jeromy committed
796 797 798 799 800
		c, err := cid.Cast(val.([]byte))
		if err != nil {
			return err
		}

801
		rnd, err := n.DAG.Get(n.Context(), c)
Jeromy's avatar
Jeromy committed
802 803 804
		if err != nil {
			return fmt.Errorf("error loading filesroot from DAG: %s", err)
		}
805 806 807 808 809 810 811

		pbnd, ok := rnd.(*merkledag.ProtoNode)
		if !ok {
			return merkledag.ErrNotProtobuf
		}

		nd = pbnd
Jeromy's avatar
Jeromy committed
812 813 814 815 816 817 818 819 820 821 822 823 824
	default:
		return err
	}

	mr, err := mfs.NewRoot(n.Context(), n.DAG, nd, pf)
	if err != nil {
		return err
	}

	n.FilesRoot = mr
	return nil
}

825 826
// SetupOfflineRouting instantiates a routing system in offline mode. This is
// primarily used for offline ipns modifications.
Jeromy's avatar
Jeromy committed
827
func (n *IpfsNode) SetupOfflineRouting() error {
828 829 830 831
	if n.Routing != nil {
		// Routing was already set up
		return nil
	}
832 833

	// TODO: move this somewhere else.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
834
	err := n.LoadPrivateKey()
Jeromy's avatar
Jeromy committed
835 836 837 838
	if err != nil {
		return err
	}

839
	n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.RecordValidator)
840

841 842 843 844 845 846
	size, err := n.getCacheSize()
	if err != nil {
		return err
	}

	n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
847

848
	return nil
849 850 851 852
}

func loadPrivateKey(cfg *config.Identity, id peer.ID) (ic.PrivKey, error) {
	sk, err := cfg.DecodePrivateKey("passphrase todo!")
853 854 855
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
856

857 858 859 860
	id2, err := peer.IDFromPrivateKey(sk)
	if err != nil {
		return nil, err
	}
861

862 863
	if id2 != id {
		return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
864 865
	}

866
	return sk, nil
867
}
868

869
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
870 871 872
	var listen []ma.Multiaddr
	for _, addr := range cfg.Addresses.Swarm {
		maddr, err := ma.NewMultiaddr(addr)
873
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
874
			return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
875
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
876
		listen = append(listen, maddr)
877 878 879 880
	}

	return listen, nil
}
881

Kevin Atkinson's avatar
Kevin Atkinson committed
882
type ConstructPeerHostOpts struct {
883
	AddrsFactory      p2pbhost.AddrsFactory
vyzo's avatar
vyzo committed
884 885 886
	DisableNatPortMap bool
	DisableRelay      bool
	EnableRelayHop    bool
Jeromy's avatar
Jeromy committed
887
	ConnectionManager ifconnmgr.ConnManager
Kevin Atkinson's avatar
Kevin Atkinson committed
888 889
}

Steven Allen's avatar
Steven Allen committed
890
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error)
Jeromy's avatar
Jeromy committed
891 892 893

var DefaultHostOption HostOption = constructPeerHost

894
// isolates the complex initialization steps
Steven Allen's avatar
Steven Allen committed
895 896 897 898
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
	pkey := ps.PrivKey(id)
	if pkey == nil {
		return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
899
	}
Steven Allen's avatar
Steven Allen committed
900 901
	options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
	return libp2p.New(ctx, options...)
902 903
}

904 905 906 907 908 909 910 911 912 913 914 915
func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
	var raddrs []ma.Multiaddr
	for _, addr := range addrs {
		_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
		if err == nil {
			continue
		}
		raddrs = append(raddrs, addr)
	}
	return raddrs
}

916 917 918 919 920 921
func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory {
	return func(addrs []ma.Multiaddr) []ma.Multiaddr {
		return f(g(addrs))
	}
}

922
// startListening on the network addresses
Łukasz Magiera's avatar
Łukasz Magiera committed
923
func startListening(host p2phost.Host, cfg *config.Config) error {
924 925
	listenAddrs, err := listenAddresses(cfg)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
926
		return err
927 928 929
	}

	// Actually start listening:
Steven Allen's avatar
Steven Allen committed
930
	if err := host.Network().Listen(listenAddrs...); err != nil {
931
		return err
932 933
	}

934
	// list out our addresses
935
	addrs, err := host.Network().InterfaceListenAddresses()
936
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
937
		return err
938
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
939
	log.Infof("Swarm listening at: %s", addrs)
940
	return nil
941
}
942

943 944 945 946 947 948
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
	return dht.New(
		ctx, host,
		dhtopts.Datastore(dstore),
		dhtopts.Validator(validator),
	)
949
}
Jeromy's avatar
Jeromy committed
950

951 952 953 954 955 956 957
func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
	return dht.New(
		ctx, host,
		dhtopts.Client(true),
		dhtopts.Datastore(dstore),
		dhtopts.Validator(validator),
	)
Jeromy's avatar
Jeromy committed
958 959
}

960
type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error)
Jeromy's avatar
Jeromy committed
961

Jeromy's avatar
Jeromy committed
962
type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error)
Jeromy's avatar
Jeromy committed
963

964
var DHTOption RoutingOption = constructDHTRouting
Jeromy's avatar
Jeromy committed
965
var DHTClientOption RoutingOption = constructClientDHTRouting
966
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting