core.go 26.6 KB
Newer Older
1 2 3 4
/*
Package core implements the IpfsNode object and related methods.

Packages underneath core/ provide a (relatively) stable, low-level API
5 6 7 8
to carry out most IPFS-related tasks.  For more details on the other
interfaces and how core/... fits into the bigger IPFS picture, see:

  $ godoc github.com/ipfs/go-ipfs
9
*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
package core

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
import (
Jakub Sztandera's avatar
Jakub Sztandera committed
13
	"bytes"
14
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
	"errors"
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
16
	"fmt"
17
	"io"
18 19 20
	"io/ioutil"
	"os"
	"strings"
21
	"time"
22

23
	rp "github.com/ipfs/go-ipfs/exchange/reprovide"
24
	filestore "github.com/ipfs/go-ipfs/filestore"
25 26 27 28
	mount "github.com/ipfs/go-ipfs/fuse/mount"
	mfs "github.com/ipfs/go-ipfs/mfs"
	namesys "github.com/ipfs/go-ipfs/namesys"
	ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
Łukasz Magiera's avatar
Łukasz Magiera committed
29
	p2p "github.com/ipfs/go-ipfs/p2p"
30 31 32
	pin "github.com/ipfs/go-ipfs/pin"
	repo "github.com/ipfs/go-ipfs/repo"
	config "github.com/ipfs/go-ipfs/repo/config"
Steven Allen's avatar
Steven Allen committed
33 34 35 36 37 38
	bserv "gx/ipfs/QmPkMDBc7pSitAf2uixsNyZ53uheBjcwFTGLtXKpgdNcP4/go-blockservice"
	ft "gx/ipfs/QmVxjT67BU1QZUPzSLNZT6DkDzVNfPfkzqNyJYFXxSH2hA/go-unixfs"
	"gx/ipfs/QmY2QaawxgJw2rn7WsFNkWEYph3z2azpyYdrhAc1JctDmE/go-path/resolver"
	bitswap "gx/ipfs/QmaYxvpkdEd5anuSzWiivJdeZV1W1NYW52xJQB1khSqH1b/go-bitswap"
	bsnet "gx/ipfs/QmaYxvpkdEd5anuSzWiivJdeZV1W1NYW52xJQB1khSqH1b/go-bitswap/network"
	merkledag "gx/ipfs/QmfKKGzisaoP4oiHQSHz1zLbXDCTeXe7NVfX1FAMKzcHmt/go-merkledag"
Jeromy's avatar
Jeromy committed
39

40
	u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util"
Steven Allen's avatar
Steven Allen committed
41
	rhelpers "gx/ipfs/QmQpvpeXa8rBfDmt3bdh2ckw2867vsYN1ozf79X7U5rij9/go-libp2p-routing-helpers"
42
	pnet "gx/ipfs/QmRGvSwDpN4eunxgDNfmQhayZ6Z9F5a2v31V2D7y77osLg/go-libp2p-pnet"
Steven Allen's avatar
Steven Allen committed
43
	psrouter "gx/ipfs/QmS3QZaaGMLwYQD1cWaGnM4san4zzT7z31tmhuXJMZ91dh/go-libp2p-pubsub-router"
44
	goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
Jeromy's avatar
Jeromy committed
45
	mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
Steven Allen's avatar
Steven Allen committed
46
	mafilter "gx/ipfs/QmSW4uNHbvQia8iZDXzbwjiyHQtnyo9aFqfQAMasj3TJ6Y/go-maddr-filter"
Steven Allen's avatar
Steven Allen committed
47
	bstore "gx/ipfs/QmTCHqj6s51pDu1GaPGyBW2VdmCUvtzLCF6nWykfX9ZYRt/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
48 49
	dht "gx/ipfs/QmTktQYCKzQjhxF6dk5xJPRuhHn3JBiKGvMLoiDy1mYmxC/go-libp2p-kad-dht"
	dhtopts "gx/ipfs/QmTktQYCKzQjhxF6dk5xJPRuhHn3JBiKGvMLoiDy1mYmxC/go-libp2p-kad-dht/opts"
Steven Allen's avatar
Steven Allen committed
50
	record "gx/ipfs/QmVsp2KdPYE6M8ryzCk5KHLo3zprcY5hBDaYx6uPCFUdxA/go-libp2p-record"
Steven Allen's avatar
Steven Allen committed
51
	floodsub "gx/ipfs/QmXScvRbYh9X9okLuX9YMnz1HR4WgRTU2hocjBs15nmCNG/go-libp2p-floodsub"
Steven Allen's avatar
Steven Allen committed
52
	ifconnmgr "gx/ipfs/QmXuucFcuvAWYAJfhHV2h4BYreHEAsLSsiquosiXeuduTN/go-libp2p-interface-connmgr"
Steven Allen's avatar
Steven Allen committed
53 54 55 56 57 58
	libp2p "gx/ipfs/QmY51bqSM5XgxQZqsBrQcRkKTnCb8EKpJpR9K6Qax7Njco/go-libp2p"
	discovery "gx/ipfs/QmY51bqSM5XgxQZqsBrQcRkKTnCb8EKpJpR9K6Qax7Njco/go-libp2p/p2p/discovery"
	p2pbhost "gx/ipfs/QmY51bqSM5XgxQZqsBrQcRkKTnCb8EKpJpR9K6Qax7Njco/go-libp2p/p2p/host/basic"
	rhost "gx/ipfs/QmY51bqSM5XgxQZqsBrQcRkKTnCb8EKpJpR9K6Qax7Njco/go-libp2p/p2p/host/routed"
	identify "gx/ipfs/QmY51bqSM5XgxQZqsBrQcRkKTnCb8EKpJpR9K6Qax7Njco/go-libp2p/p2p/protocol/identify"
	ping "gx/ipfs/QmY51bqSM5XgxQZqsBrQcRkKTnCb8EKpJpR9K6Qax7Njco/go-libp2p/p2p/protocol/ping"
Steven Allen's avatar
Steven Allen committed
59
	smux "gx/ipfs/QmY9JXR3FupnYAYJWK9aMr9bCpqWKcToQ1tz8DVGTrHpHw/go-stream-muxer"
Steven Allen's avatar
Steven Allen committed
60
	connmgr "gx/ipfs/QmYAL9JsqVVPFWwM1ZzHNsofmTzRYQHJ2KqQaBmFJjJsNx/go-libp2p-connmgr"
Steven Allen's avatar
Steven Allen committed
61
	cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
Steven Allen's avatar
Steven Allen committed
62
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
Steven Allen's avatar
Steven Allen committed
63
	routing "gx/ipfs/QmZ383TySJVeZWzGnWui6pRcKyYZk9VkKTuW7tmKRWk5au/go-libp2p-routing"
Steven Allen's avatar
Steven Allen committed
64 65
	mplex "gx/ipfs/QmZHiqdRuNXujvSPNu1ZWxxzV6a2WhoZpfYkesdgyaKF9f/go-smux-multiplex"
	pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
66
	ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
Steven Allen's avatar
Steven Allen committed
67
	p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
Steven Allen's avatar
Steven Allen committed
68 69 70
	nilrouting "gx/ipfs/QmbFRJeEmEU16y3BmKKaD4a9fm5oHsEAMHe2vSB1UnfLMi/go-ipfs-routing/none"
	offroute "gx/ipfs/QmbFRJeEmEU16y3BmKKaD4a9fm5oHsEAMHe2vSB1UnfLMi/go-ipfs-routing/offline"
	exchange "gx/ipfs/Qmc2faLf7URkHpsbfYM4EMbr8iSAcGAe8VPgVi64HVnwji/go-ipfs-exchange-interface"
Steven Allen's avatar
Steven Allen committed
71
	circuit "gx/ipfs/QmcQ56iqKP8ZRhRGLe5EReJVvrJZDaGzkuatrPv4Z1B6cG/go-libp2p-circuit"
Steven Allen's avatar
Steven Allen committed
72 73
	logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
	metrics "gx/ipfs/QmcoBbyTiL9PFjo1GFixJwqQ8mZLJ36CribuqyKmS1okPu/go-libp2p-metrics"
Steven Allen's avatar
Steven Allen committed
74
	yamux "gx/ipfs/QmcsgrV3nCAKjiHKZhKVXWc4oY3WBECJCqahXEMpHeMrev/go-smux-yamux"
Steven Allen's avatar
Steven Allen committed
75
	peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
76 77
	ic "gx/ipfs/Qme1knMqwt1hKZbc1BmQFmnm9f36nyQGwXxPGVpVJ9rMK5/go-libp2p-crypto"
	ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79
)

Jeromy's avatar
Jeromy committed
80
const IpnsValidatorTag = "ipns"
81

82
const kReprovideFrequency = time.Hour * 12
83
const discoveryConnTimeout = time.Second * 30
Jeromy's avatar
Jeromy committed
84

Jeromy's avatar
Jeromy committed
85
var log = logging.Logger("core")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87 88 89 90
type mode int

const (
	// zero value is not a valid mode, must be explicitly set
91
	localMode mode = iota
92 93 94 95
	offlineMode
	onlineMode
)

96 97 98 99
func init() {
	identify.ClientVersion = "go-ipfs/" + config.CurrentVersionNumber + "/" + config.CurrentCommit
}

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
100
// IpfsNode is IPFS Core module. It represents an IPFS instance.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101 102
type IpfsNode struct {

103
	// Self
104
	Identity peer.ID // the local node's identity
105

106
	Repo repo.Repo
107 108

	// Local node
109 110 111 112
	Pinning         pin.Pinner // the pinning manager
	Mounts          Mounts     // current mount state, if any.
	PrivateKey      ic.PrivKey // the local node's private Key
	PNetFingerprint []byte     // fingerprint of private network
113 114

	// Services
115 116 117 118 119 120 121 122 123 124 125 126
	Peerstore       pstore.Peerstore     // storage for other Peer instances
	Blockstore      bstore.GCBlockstore  // the block store (lower level)
	Filestore       *filestore.Filestore // the filestore blockstore
	BaseBlocks      bstore.Blockstore    // the raw blockstore, no filestore wrapping
	GCLocker        bstore.GCLocker      // the locker used to protect the blockstore during gc
	Blocks          bserv.BlockService   // the block service, get/add blocks.
	DAG             ipld.DAGService      // the merkle dag service, get/add objects.
	Resolver        *resolver.Resolver   // the path resolution system
	Reporter        metrics.Reporter
	Discovery       discovery.Service
	FilesRoot       *mfs.Root
	RecordValidator record.Validator
127 128

	// Online
129 130 131 132 133
	PeerHost     p2phost.Host        // the network host (server+client)
	Bootstrapper io.Closer           // the periodic bootstrapper
	Routing      routing.IpfsRouting // the routing system. recommend ipfs-dht
	Exchange     exchange.Interface  // the block exchange + strategy (bitswap)
	Namesys      namesys.NameSystem  // the name system, resolves paths to hashes
Jeromy's avatar
Jeromy committed
134 135
	Ping         *ping.PingService
	Reprovider   *rp.Reprovider // the value reprovider system
Jeromy's avatar
Jeromy committed
136
	IpnsRepub    *ipnsrp.Republisher
137

Jeromy's avatar
Jeromy committed
138
	Floodsub *floodsub.PubSub
139
	PSRouter *psrouter.PubsubValueStore
140
	DHT      *dht.IpfsDHT
Łukasz Magiera's avatar
Łukasz Magiera committed
141
	P2P      *p2p.P2P
Jeromy's avatar
Jeromy committed
142

143 144
	proc goprocess.Process
	ctx  context.Context
145

Jeromy's avatar
Jeromy committed
146
	mode         mode
147
	localModeSet bool
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148 149
}

150 151 152 153 154 155 156 157
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
	Ipfs mount.Mount
	Ipns mount.Mount
}

vyzo's avatar
vyzo committed
158
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {
159
	if n.PeerHost != nil { // already online.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
		return errors.New("node already online")
161 162 163
	}

	// load private key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164
	if err := n.LoadPrivateKey(); err != nil {
165 166 167
		return err
	}

Jeromy's avatar
Jeromy committed
168
	// get undialable addrs from config
169 170 171 172
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
173 174

	var libp2pOpts []libp2p.Option
175
	for _, s := range cfg.Swarm.AddrFilters {
Jeromy's avatar
Jeromy committed
176 177
		f, err := mamask.NewMask(s)
		if err != nil {
178
			return fmt.Errorf("incorrectly formatted address filter in config: %s", s)
Jeromy's avatar
Jeromy committed
179
		}
Steven Allen's avatar
Steven Allen committed
180
		libp2pOpts = append(libp2pOpts, libp2p.FilterAddresses(f))
Jeromy's avatar
Jeromy committed
181 182
	}

183 184 185
	if !cfg.Swarm.DisableBandwidthMetrics {
		// Set reporter
		n.Reporter = metrics.NewBandwidthCounter()
Steven Allen's avatar
Steven Allen committed
186
		libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(n.Reporter))
187 188
	}

Jakub Sztandera's avatar
Jakub Sztandera committed
189 190 191 192 193 194
	swarmkey, err := n.Repo.SwarmKey()
	if err != nil {
		return err
	}

	if swarmkey != nil {
Steven Allen's avatar
Steven Allen committed
195
		protec, err := pnet.NewProtector(bytes.NewReader(swarmkey))
Jakub Sztandera's avatar
Jakub Sztandera committed
196
		if err != nil {
197
			return fmt.Errorf("failed to configure private network: %s", err)
Jakub Sztandera's avatar
Jakub Sztandera committed
198
		}
199
		n.PNetFingerprint = protec.Fingerprint()
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
		go func() {
			t := time.NewTicker(30 * time.Second)
			<-t.C // swallow one tick
			for {
				select {
				case <-t.C:
					if ph := n.PeerHost; ph != nil {
						if len(ph.Network().Peers()) == 0 {
							log.Warning("We are in private network and have no peers.")
							log.Warning("This might be configuration mistake.")
						}
					}
				case <-n.Process().Closing():
					t.Stop()
					return
				}
			}
		}()
Steven Allen's avatar
Steven Allen committed
218 219

		libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protec))
Jakub Sztandera's avatar
Jakub Sztandera committed
220 221
	}

222 223 224 225
	addrsFactory, err := makeAddrsFactory(cfg.Addresses)
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
226 227 228 229
	if !cfg.Swarm.DisableRelay {
		addrsFactory = composeAddrsFactory(addrsFactory, filterRelayAddrs)
	}
	libp2pOpts = append(libp2pOpts, libp2p.AddrsFactory(addrsFactory))
230

Steven Allen's avatar
Steven Allen committed
231
	connm, err := constructConnMgr(cfg.Swarm.ConnMgr)
Jeromy's avatar
Jeromy committed
232 233 234
	if err != nil {
		return err
	}
Steven Allen's avatar
Steven Allen committed
235 236 237
	libp2pOpts = append(libp2pOpts, libp2p.ConnectionManager(connm))

	libp2pOpts = append(libp2pOpts, makeSmuxTransportOption(mplex))
Jeromy's avatar
Jeromy committed
238

Steven Allen's avatar
Steven Allen committed
239 240
	if !cfg.Swarm.DisableNatPortMap {
		libp2pOpts = append(libp2pOpts, libp2p.NATPortMap())
241
	}
Steven Allen's avatar
Steven Allen committed
242 243 244 245 246 247 248 249 250
	if !cfg.Swarm.DisableRelay {
		var opts []circuit.RelayOpt
		if cfg.Swarm.EnableRelayHop {
			opts = append(opts, circuit.OptHop)
		}
		libp2pOpts = append(libp2pOpts, libp2p.EnableRelay(opts...))
	}

	peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, libp2pOpts...)
vyzo's avatar
vyzo committed
251

252
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253
		return err
254 255
	}

256
	if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption, pubsub, ipnsps); err != nil {
257
		return err
258 259 260
	}

	// Ok, now we're ready to listen.
Łukasz Magiera's avatar
Łukasz Magiera committed
261
	if err := startListening(n.PeerHost, cfg); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262
		return err
263
	}
264

Łukasz Magiera's avatar
Łukasz Magiera committed
265
	n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)
266

267
	// setup local discovery
Jeromy's avatar
Jeromy committed
268
	if do != nil {
Jeromy's avatar
Jeromy committed
269
		service, err := do(ctx, n.PeerHost)
Jeromy's avatar
Jeromy committed
270
		if err != nil {
Jeromy's avatar
Jeromy committed
271 272 273 274
			log.Error("mdns error: ", err)
		} else {
			service.RegisterNotifee(n)
			n.Discovery = service
Jeromy's avatar
Jeromy committed
275
		}
276 277
	}

278
	return n.Bootstrap(DefaultBootstrapConfig)
279 280
}

Jeromy's avatar
Jeromy committed
281 282
func constructConnMgr(cfg config.ConnMgr) (ifconnmgr.ConnManager, error) {
	switch cfg.Type {
283 284 285 286
	case "":
		// 'default' value is the basic connection manager
		return connmgr.NewConnManager(config.DefaultConnMgrLowWater, config.DefaultConnMgrHighWater, config.DefaultConnMgrGracePeriod), nil
	case "none":
Jeromy's avatar
Jeromy committed
287 288 289 290 291 292 293 294 295 296 297 298 299
		return nil, nil
	case "basic":
		grace, err := time.ParseDuration(cfg.GracePeriod)
		if err != nil {
			return nil, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)
		}

		return connmgr.NewConnManager(cfg.LowWater, cfg.HighWater, grace), nil
	default:
		return nil, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Type)
	}
}

Łukasz Magiera's avatar
Łukasz Magiera committed
300 301 302 303 304 305
func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

306
	var keyProvider rp.KeyChanFunc
Łukasz Magiera's avatar
Łukasz Magiera committed
307 308 309 310 311 312 313 314 315 316 317

	switch cfg.Reprovider.Strategy {
	case "all":
		fallthrough
	case "":
		keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
	case "roots":
		keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
	case "pinned":
		keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
	default:
318
		return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
Łukasz Magiera's avatar
Łukasz Magiera committed
319
	}
320
	n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)
Łukasz Magiera's avatar
Łukasz Magiera committed
321

322 323 324 325 326
	reproviderInterval := kReprovideFrequency
	if cfg.Reprovider.Interval != "" {
		dur, err := time.ParseDuration(cfg.Reprovider.Interval)
		if err != nil {
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
327 328
		}

329
		reproviderInterval = dur
Łukasz Magiera's avatar
Łukasz Magiera committed
330 331
	}

332 333
	go n.Reprovider.Run(reproviderInterval)

Łukasz Magiera's avatar
Łukasz Magiera committed
334 335 336
	return nil
}

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
	var annAddrs []ma.Multiaddr
	for _, addr := range cfg.Announce {
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}
		annAddrs = append(annAddrs, maddr)
	}

	filters := mafilter.NewFilters()
	noAnnAddrs := map[string]bool{}
	for _, addr := range cfg.NoAnnounce {
		f, err := mamask.NewMask(addr)
		if err == nil {
			filters.AddDialFilter(f)
			continue
		}
		maddr, err := ma.NewMultiaddr(addr)
		if err != nil {
			return nil, err
		}
		noAnnAddrs[maddr.String()] = true
	}

	return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
		var addrs []ma.Multiaddr
		if len(annAddrs) > 0 {
			addrs = annAddrs
		} else {
			addrs = allAddrs
		}

		var out []ma.Multiaddr
		for _, maddr := range addrs {
			// check for exact matches
			ok, _ := noAnnAddrs[maddr.String()]
			// check for /ipcidr matches
			if !ok && !filters.AddrBlocked(maddr) {
				out = append(out, maddr)
			}
		}
		return out
	}, nil
}

Steven Allen's avatar
Steven Allen committed
383 384 385
func makeSmuxTransportOption(mplexExp bool) libp2p.Option {
	const yamuxID = "/yamux/1.0.0"
	const mplexID = "/mplex/6.7.0"
386 387

	ymxtpt := &yamux.Transport{
Steven Allen's avatar
Steven Allen committed
388
		AcceptBacklog:          512,
389 390 391 392 393 394 395
		ConnectionWriteTimeout: time.Second * 10,
		KeepAliveInterval:      time.Second * 30,
		EnableKeepAlive:        true,
		MaxStreamWindowSize:    uint32(1024 * 512),
		LogOutput:              ioutil.Discard,
	}

396 397 398 399
	if os.Getenv("YAMUX_DEBUG") != "" {
		ymxtpt.LogOutput = os.Stderr
	}

Steven Allen's avatar
Steven Allen committed
400
	muxers := map[string]smux.Transport{yamuxID: ymxtpt}
401
	if mplexExp {
Steven Allen's avatar
Steven Allen committed
402
		muxers[mplexID] = mplex.DefaultTransport
403 404 405
	}

	// Allow muxer preference order overriding
Steven Allen's avatar
Steven Allen committed
406
	order := []string{yamuxID, mplexID}
407
	if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
Steven Allen's avatar
Steven Allen committed
408 409 410 411 412 413 414 415 416 417 418 419
		order = strings.Fields(prefs)
	}

	opts := make([]libp2p.Option, 0, len(order))
	for _, id := range order {
		tpt, ok := muxers[id]
		if !ok {
			log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id)
			continue
		}
		delete(muxers, id)
		opts = append(opts, libp2p.Muxer(id, tpt))
420 421
	}

Steven Allen's avatar
Steven Allen committed
422
	return libp2p.ChainOptions(opts...)
423 424
}

Jeromy's avatar
Jeromy committed
425 426
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
	if d.MDNS.Enabled {
Jeromy's avatar
Jeromy committed
427
		return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) {
Jeromy's avatar
Jeromy committed
428 429 430
			if d.MDNS.Interval == 0 {
				d.MDNS.Interval = 5
			}
Jeromy's avatar
Jeromy committed
431
			return discovery.NewMdnsService(ctx, h, time.Duration(d.MDNS.Interval)*time.Second, discovery.ServiceTag)
Jeromy's avatar
Jeromy committed
432 433 434 435 436
		}
	}
	return nil
}

437 438
// HandlePeerFound attempts to connect to peer from `PeerInfo`, if it fails
// logs a warning log.
Jeromy's avatar
Jeromy committed
439
func (n *IpfsNode) HandlePeerFound(p pstore.PeerInfo) {
440
	log.Warning("trying peer info: ", p)
441
	ctx, cancel := context.WithTimeout(n.Context(), discoveryConnTimeout)
rht's avatar
rht committed
442
	defer cancel()
443
	if err := n.PeerHost.Connect(ctx, p); err != nil {
444 445 446 447
		log.Warning("Failed to connect to peer found by discovery: ", err)
	}
}

448 449
// startOnlineServicesWithHost  is the set of services which need to be
// initialized with the host and _before_ we start listening.
450
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption, pubsub bool, ipnsps bool) error {
451
	// setup diagnostics service
Jeromy's avatar
Jeromy committed
452
	n.Ping = ping.NewPingService(host)
453

454 455 456 457 458 459 460 461
	if pubsub || ipnsps {
		service, err := floodsub.NewFloodSub(ctx, host)
		if err != nil {
			return err
		}
		n.Floodsub = service
	}

462
	// setup routing service
463
	r, err := routingOption(ctx, host, n.Repo.Datastore(), n.RecordValidator)
Jeromy's avatar
Jeromy committed
464
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
465
		return err
466
	}
Jeromy's avatar
Jeromy committed
467
	n.Routing = r
468

469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
	// TODO: I'm not a fan of type assertions like this but the
	// `RoutingOption` system doesn't currently provide access to the
	// IpfsNode.
	//
	// Ideally, we'd do something like:
	//
	// 1. Add some fancy method to introspect into tiered routers to extract
	//    things like the pubsub router or the DHT (complicated, messy,
	//    probably not worth it).
	// 2. Pass the IpfsNode into the RoutingOption (would also remove the
	//    PSRouter case below.
	// 3. Introduce some kind of service manager? (my personal favorite but
	//    that requires a fair amount of work).
	if dht, ok := r.(*dht.IpfsDHT); ok {
		n.DHT = dht
	}

486 487 488 489 490 491
	if ipnsps {
		n.PSRouter = psrouter.NewPubsubValueStore(
			ctx,
			host,
			n.Routing,
			n.Floodsub,
492
			n.RecordValidator,
493 494 495 496 497 498 499 500 501 502 503 504 505
		)
		n.Routing = rhelpers.Tiered{
			// Always check pubsub first.
			&rhelpers.Compose{
				ValueStore: &rhelpers.LimitedValueStore{
					ValueStore: n.PSRouter,
					Namespaces: []string{"ipns"},
				},
			},
			n.Routing,
		}
	}

506 507 508
	// Wrap standard peer host with routing system to allow unknown peer lookups
	n.PeerHost = rhost.Wrap(host, n.Routing)

509
	// setup exchange service
510
	bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
Łukasz Magiera's avatar
Łukasz Magiera committed
511
	n.Exchange = bitswap.New(ctx, bitswapNetwork, n.Blockstore)
512

513 514 515 516 517
	size, err := n.getCacheSize()
	if err != nil {
		return err
	}

518
	// setup name system
519
	n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
520

Jeromy's avatar
Jeromy committed
521
	// setup ipns republishing
522
	return n.setupIpnsRepublisher()
523 524
}

525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
// getCacheSize returns cache life and cache size
func (n *IpfsNode) getCacheSize() (int, error) {
	cfg, err := n.Repo.Config()
	if err != nil {
		return 0, err
	}

	cs := cfg.Ipns.ResolveCacheSize
	if cs == 0 {
		cs = 128
	}
	if cs < 0 {
		return 0, fmt.Errorf("cannot specify negative resolve cache size")
	}
	return cs, nil
}

542
func (n *IpfsNode) setupIpnsRepublisher() error {
Jeromy's avatar
Jeromy committed
543 544 545 546
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
547

548
	n.IpnsRepub = ipnsrp.NewRepublisher(n.Namesys, n.Repo.Datastore(), n.PrivateKey, n.Repo.Keystore())
549

Jeromy's avatar
Jeromy committed
550 551 552 553 554 555
	if cfg.Ipns.RepublishPeriod != "" {
		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
		if err != nil {
			return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)
		}

556
		if !u.Debug && (d < time.Minute || d > (time.Hour*24)) {
Jeromy's avatar
Jeromy committed
557 558 559 560 561 562
			return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)
		}

		n.IpnsRepub.Interval = d
	}

563 564 565 566 567 568 569 570 571
	if cfg.Ipns.RecordLifetime != "" {
		d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod)
		if err != nil {
			return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)
		}

		n.IpnsRepub.RecordLifetime = d
	}

Jeromy's avatar
Jeromy committed
572 573
	n.Process().Go(n.IpnsRepub.Run)

574 575 576
	return nil
}

577 578 579 580 581 582 583 584 585 586 587 588
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
	return n.proc
}

// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
	return n.proc.Close()
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
589 590 591
	if n.ctx == nil {
		n.ctx = context.TODO()
	}
592 593 594
	return n.ctx
}

595 596
// teardown closes owned children. If any errors occur, this function returns
// the first error.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
597
func (n *IpfsNode) teardown() error {
598
	log.Debug("core is shutting down...")
599 600
	// owned objects are closed in this teardown to ensure that they're closed
	// regardless of which constructor was used to add them to the node.
Jeromy's avatar
Jeromy committed
601 602
	var closers []io.Closer

603
	// NOTE: The order that objects are added(closed) matters, if an object
Jeromy's avatar
Jeromy committed
604 605 606 607 608
	// needs to use another during its shutdown/cleanup process, it should be
	// closed before that other object

	if n.FilesRoot != nil {
		closers = append(closers, n.FilesRoot)
Jeromy's avatar
Jeromy committed
609
	}
610

611 612 613 614
	if n.Exchange != nil {
		closers = append(closers, n.Exchange)
	}

615
	if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
616 617
		closers = append(closers, mount.Closer(n.Mounts.Ipfs))
	}
618
	if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619 620 621
		closers = append(closers, mount.Closer(n.Mounts.Ipns))
	}

622 623
	if n.DHT != nil {
		closers = append(closers, n.DHT.Process())
Jeromy's avatar
Jeromy committed
624 625
	}

Jeromy's avatar
Jeromy committed
626 627 628 629
	if n.Blocks != nil {
		closers = append(closers, n.Blocks)
	}

Jeromy's avatar
Jeromy committed
630 631
	if n.Bootstrapper != nil {
		closers = append(closers, n.Bootstrapper)
632 633
	}

Jeromy's avatar
Jeromy committed
634 635
	if n.PeerHost != nil {
		closers = append(closers, n.PeerHost)
636
	}
637

Jeromy's avatar
Jeromy committed
638 639 640
	// Repo closed last, most things need to preserve state here
	closers = append(closers, n.Repo)

641
	var errs []error
642
	for _, closer := range closers {
643 644
		if err := closer.Close(); err != nil {
			errs = append(errs, err)
645 646 647 648
		}
	}
	if len(errs) > 0 {
		return errs[0]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
649 650
	}
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
651 652
}

653
// OnlineMode returns whether or not the IpfsNode is in OnlineMode.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
654
func (n *IpfsNode) OnlineMode() bool {
655
	return n.mode == onlineMode
Brian Tiger Chow's avatar
Brian Tiger Chow committed
656 657
}

658
// SetLocal will set the IpfsNode to local mode
659 660 661 662 663 664 665
func (n *IpfsNode) SetLocal(isLocal bool) {
	if isLocal {
		n.mode = localMode
	}
	n.localModeSet = true
}

666
// LocalMode returns whether or not the IpfsNode is in LocalMode
667 668 669 670 671
func (n *IpfsNode) LocalMode() bool {
	if !n.localModeSet {
		// programmer error should not happen
		panic("local mode not set")
	}
672
	return n.mode == localMode
673 674
}

675
// Bootstrap will set and call the IpfsNodes bootstrap function.
676
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
677
	// TODO what should return value be when in offlineMode?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
678 679 680 681
	if n.Routing == nil {
		return nil
	}

682 683 684 685 686 687 688
	if n.Bootstrapper != nil {
		n.Bootstrapper.Close() // stop previous bootstrap process.
	}

	// if the caller did not specify a bootstrap peer function, get the
	// freshest bootstrap peers from config. this responds to live changes.
	if cfg.BootstrapPeers == nil {
Jeromy's avatar
Jeromy committed
689
		cfg.BootstrapPeers = func() []pstore.PeerInfo {
690
			ps, err := n.loadBootstrapPeers()
691
			if err != nil {
692
				log.Warning("failed to parse bootstrap peers from config")
693 694 695 696 697 698 699 700 701
				return nil
			}
			return ps
		}
	}

	var err error
	n.Bootstrapper, err = Bootstrap(n, cfg)
	return err
702 703
}

704 705
func (n *IpfsNode) loadID() error {
	if n.Identity != "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
706
		return errors.New("identity already loaded")
707 708
	}

709 710 711 712 713 714
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	cid := cfg.Identity.PeerID
715
	if cid == "" {
716
		return errors.New("identity was not set in config (was 'ipfs init' run?)")
717 718
	}
	if len(cid) == 0 {
719
		return errors.New("no peer ID in config! (was 'ipfs init' run?)")
720 721
	}

Steven Allen's avatar
Steven Allen committed
722 723 724 725 726 727
	id, err := peer.IDB58Decode(cid)
	if err != nil {
		return fmt.Errorf("peer ID invalid: %s", err)
	}

	n.Identity = id
728 729
	return nil
}
730

731
// GetKey will return a key from the Keystore with name `name`.
732 733 734 735 736 737 738 739
func (n *IpfsNode) GetKey(name string) (ic.PrivKey, error) {
	if name == "self" {
		return n.PrivateKey, nil
	} else {
		return n.Repo.Keystore().Get(name)
	}
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
740
func (n *IpfsNode) LoadPrivateKey() error {
741
	if n.Identity == "" || n.Peerstore == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
742
		return errors.New("loaded private key out of order")
743 744
	}

745
	if n.PrivateKey != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
746
		return errors.New("private key already loaded")
747 748
	}

749 750 751 752 753 754
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	sk, err := loadPrivateKey(&cfg.Identity, n.Identity)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
755
	if err != nil {
756
		return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
757
	}
758

759 760
	n.PrivateKey = sk
	n.Peerstore.AddPrivKey(n.Identity, n.PrivateKey)
Jeromy's avatar
Jeromy committed
761 762 763 764
	n.Peerstore.AddPubKey(n.Identity, sk.GetPublic())
	return nil
}

Jeromy's avatar
Jeromy committed
765
func (n *IpfsNode) loadBootstrapPeers() ([]pstore.PeerInfo, error) {
766 767 768 769 770 771
	cfg, err := n.Repo.Config()
	if err != nil {
		return nil, err
	}

	parsed, err := cfg.BootstrapPeers()
772 773 774 775 776 777
	if err != nil {
		return nil, err
	}
	return toPeerInfos(parsed), nil
}

Jeromy's avatar
Jeromy committed
778
func (n *IpfsNode) loadFilesRoot() error {
Jeromy's avatar
Jeromy committed
779
	dsk := ds.NewKey("/local/filesroot")
Jeromy's avatar
Jeromy committed
780 781
	pf := func(ctx context.Context, c *cid.Cid) error {
		return n.Repo.Datastore().Put(dsk, c.Bytes())
Jeromy's avatar
Jeromy committed
782 783
	}

784
	var nd *merkledag.ProtoNode
Jeromy's avatar
Jeromy committed
785 786 787 788
	val, err := n.Repo.Datastore().Get(dsk)

	switch {
	case err == ds.ErrNotFound || val == nil:
789
		nd = ft.EmptyDirNode()
790
		err := n.DAG.Add(n.Context(), nd)
Jeromy's avatar
Jeromy committed
791 792 793 794
		if err != nil {
			return fmt.Errorf("failure writing to dagstore: %s", err)
		}
	case err == nil:
Jeromy's avatar
Jeromy committed
795 796 797 798 799
		c, err := cid.Cast(val.([]byte))
		if err != nil {
			return err
		}

800
		rnd, err := n.DAG.Get(n.Context(), c)
Jeromy's avatar
Jeromy committed
801 802 803
		if err != nil {
			return fmt.Errorf("error loading filesroot from DAG: %s", err)
		}
804 805 806 807 808 809 810

		pbnd, ok := rnd.(*merkledag.ProtoNode)
		if !ok {
			return merkledag.ErrNotProtobuf
		}

		nd = pbnd
Jeromy's avatar
Jeromy committed
811 812 813 814 815 816 817 818 819 820 821 822 823
	default:
		return err
	}

	mr, err := mfs.NewRoot(n.Context(), n.DAG, nd, pf)
	if err != nil {
		return err
	}

	n.FilesRoot = mr
	return nil
}

824 825
// SetupOfflineRouting instantiates a routing system in offline mode. This is
// primarily used for offline ipns modifications.
Jeromy's avatar
Jeromy committed
826
func (n *IpfsNode) SetupOfflineRouting() error {
827 828 829 830
	if n.Routing != nil {
		// Routing was already set up
		return nil
	}
831 832

	// TODO: move this somewhere else.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
833
	err := n.LoadPrivateKey()
Jeromy's avatar
Jeromy committed
834 835 836 837
	if err != nil {
		return err
	}

838
	n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.RecordValidator)
839

840 841 842 843 844 845
	size, err := n.getCacheSize()
	if err != nil {
		return err
	}

	n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), size)
846

847
	return nil
848 849 850 851
}

func loadPrivateKey(cfg *config.Identity, id peer.ID) (ic.PrivKey, error) {
	sk, err := cfg.DecodePrivateKey("passphrase todo!")
852 853 854
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
855

856 857 858 859
	id2, err := peer.IDFromPrivateKey(sk)
	if err != nil {
		return nil, err
	}
860

861 862
	if id2 != id {
		return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
863 864
	}

865
	return sk, nil
866
}
867

868
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
869 870 871
	var listen []ma.Multiaddr
	for _, addr := range cfg.Addresses.Swarm {
		maddr, err := ma.NewMultiaddr(addr)
872
		if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
873
			return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
874
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
875
		listen = append(listen, maddr)
876 877 878 879
	}

	return listen, nil
}
880

Kevin Atkinson's avatar
Kevin Atkinson committed
881
type ConstructPeerHostOpts struct {
882
	AddrsFactory      p2pbhost.AddrsFactory
vyzo's avatar
vyzo committed
883 884 885
	DisableNatPortMap bool
	DisableRelay      bool
	EnableRelayHop    bool
Jeromy's avatar
Jeromy committed
886
	ConnectionManager ifconnmgr.ConnManager
Kevin Atkinson's avatar
Kevin Atkinson committed
887 888
}

Steven Allen's avatar
Steven Allen committed
889
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error)
Jeromy's avatar
Jeromy committed
890 891 892

var DefaultHostOption HostOption = constructPeerHost

893
// isolates the complex initialization steps
Steven Allen's avatar
Steven Allen committed
894 895 896 897
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
	pkey := ps.PrivKey(id)
	if pkey == nil {
		return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
898
	}
Steven Allen's avatar
Steven Allen committed
899 900
	options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
	return libp2p.New(ctx, options...)
901 902
}

903 904 905 906 907 908 909 910 911 912 913 914
func filterRelayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
	var raddrs []ma.Multiaddr
	for _, addr := range addrs {
		_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
		if err == nil {
			continue
		}
		raddrs = append(raddrs, addr)
	}
	return raddrs
}

915 916 917 918 919 920
func composeAddrsFactory(f, g p2pbhost.AddrsFactory) p2pbhost.AddrsFactory {
	return func(addrs []ma.Multiaddr) []ma.Multiaddr {
		return f(g(addrs))
	}
}

921
// startListening on the network addresses
Łukasz Magiera's avatar
Łukasz Magiera committed
922
func startListening(host p2phost.Host, cfg *config.Config) error {
923 924
	listenAddrs, err := listenAddresses(cfg)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
925
		return err
926 927 928
	}

	// Actually start listening:
Steven Allen's avatar
Steven Allen committed
929
	if err := host.Network().Listen(listenAddrs...); err != nil {
930
		return err
931 932
	}

933
	// list out our addresses
934
	addrs, err := host.Network().InterfaceListenAddresses()
935
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
936
		return err
937
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
938
	log.Infof("Swarm listening at: %s", addrs)
939
	return nil
940
}
941

942 943 944 945 946 947
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
	return dht.New(
		ctx, host,
		dhtopts.Datastore(dstore),
		dhtopts.Validator(validator),
	)
948
}
Jeromy's avatar
Jeromy committed
949

950 951 952 953 954 955 956
func constructClientDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching, validator record.Validator) (routing.IpfsRouting, error) {
	return dht.New(
		ctx, host,
		dhtopts.Client(true),
		dhtopts.Datastore(dstore),
		dhtopts.Validator(validator),
	)
Jeromy's avatar
Jeromy committed
957 958
}

959
type RoutingOption func(context.Context, p2phost.Host, ds.Batching, record.Validator) (routing.IpfsRouting, error)
Jeromy's avatar
Jeromy committed
960

Jeromy's avatar
Jeromy committed
961
type DiscoveryOption func(context.Context, p2phost.Host) (discovery.Service, error)
Jeromy's avatar
Jeromy committed
962

963
var DHTOption RoutingOption = constructDHTRouting
Jeromy's avatar
Jeromy committed
964
var DHTClientOption RoutingOption = constructClientDHTRouting
965
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting