core.go 13.8 KB
Newer Older
1 2 3 4
/*
Package core implements the IpfsNode object and related methods.

Packages underneath core/ provide a (relatively) stable, low-level API
5 6 7 8
to carry out most IPFS-related tasks.  For more details on the other
interfaces and how core/... fits into the bigger IPFS picture, see:

  $ godoc github.com/ipfs/go-ipfs
9
*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
package core

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	"errors"
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
14
	"fmt"
15
	"io"
Jeromy's avatar
Jeromy committed
16
	"net"
17
	"time"
18

19 20 21
	b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
	ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
	ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
22
	goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
Jeromy's avatar
Jeromy committed
23
	mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter"
24 25
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	diag "github.com/ipfs/go-ipfs/diagnostics"
Jeromy's avatar
Jeromy committed
26
	metrics "github.com/ipfs/go-ipfs/metrics"
27
	ic "github.com/ipfs/go-ipfs/p2p/crypto"
28
	discovery "github.com/ipfs/go-ipfs/p2p/discovery"
29 30 31 32 33 34
	p2phost "github.com/ipfs/go-ipfs/p2p/host"
	p2pbhost "github.com/ipfs/go-ipfs/p2p/host/basic"
	rhost "github.com/ipfs/go-ipfs/p2p/host/routed"
	swarm "github.com/ipfs/go-ipfs/p2p/net/swarm"
	addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
Jeromy's avatar
Jeromy committed
35
	ping "github.com/ipfs/go-ipfs/p2p/protocol/ping"
Jeromy's avatar
Jeromy committed
36
	logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
37

38 39
	routing "github.com/ipfs/go-ipfs/routing"
	dht "github.com/ipfs/go-ipfs/routing/dht"
40
	nilrouting "github.com/ipfs/go-ipfs/routing/none"
41
	offroute "github.com/ipfs/go-ipfs/routing/offline"
42

43 44 45 46 47 48
	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	bserv "github.com/ipfs/go-ipfs/blockservice"
	exchange "github.com/ipfs/go-ipfs/exchange"
	bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	rp "github.com/ipfs/go-ipfs/exchange/reprovide"
49

50 51 52 53 54 55 56 57
	mount "github.com/ipfs/go-ipfs/fuse/mount"
	ipnsfs "github.com/ipfs/go-ipfs/ipnsfs"
	merkledag "github.com/ipfs/go-ipfs/merkledag"
	namesys "github.com/ipfs/go-ipfs/namesys"
	path "github.com/ipfs/go-ipfs/path"
	pin "github.com/ipfs/go-ipfs/pin"
	repo "github.com/ipfs/go-ipfs/repo"
	config "github.com/ipfs/go-ipfs/repo/config"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59
)

Jeromy's avatar
Jeromy committed
60
const IpnsValidatorTag = "ipns"
61
const kSizeBlockstoreWriteCache = 100
62
const kReprovideFrequency = time.Hour * 12
63
const discoveryConnTimeout = time.Second * 30
Jeromy's avatar
Jeromy committed
64

Jeromy's avatar
Jeromy committed
65
var log = logging.Logger("core")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66

67 68 69 70 71 72 73 74 75
type mode int

const (
	// zero value is not a valid mode, must be explicitly set
	invalidMode mode = iota
	offlineMode
	onlineMode
)

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
76
// IpfsNode is IPFS Core module. It represents an IPFS instance.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77 78
type IpfsNode struct {

79
	// Self
80
	Identity peer.ID // the local node's identity
81

82
	Repo repo.Repo
83 84

	// Local node
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85 86 87
	Pinning    pin.Pinner // the pinning manager
	Mounts     Mounts     // current mount state, if any.
	PrivateKey ic.PrivKey // the local node's private Key
88 89

	// Services
90 91 92 93 94
	Peerstore  peer.Peerstore       // storage for other Peer instances
	Blockstore bstore.Blockstore    // the block store (lower level)
	Blocks     *bserv.BlockService  // the block service, get/add blocks.
	DAG        merkledag.DAGService // the merkle dag service, get/add objects.
	Resolver   *path.Resolver       // the path resolution system
Jeromy's avatar
Jeromy committed
95
	Reporter   metrics.Reporter
96
	Discovery  discovery.Service
97 98

	// Online
99 100 101 102 103 104
	PeerHost     p2phost.Host        // the network host (server+client)
	Bootstrapper io.Closer           // the periodic bootstrapper
	Routing      routing.IpfsRouting // the routing system. recommend ipfs-dht
	Exchange     exchange.Interface  // the block exchange + strategy (bitswap)
	Namesys      namesys.NameSystem  // the name system, resolves paths to hashes
	Diagnostics  *diag.Diagnostics   // the diagnostics service
Jeromy's avatar
Jeromy committed
105 106
	Ping         *ping.PingService
	Reprovider   *rp.Reprovider // the value reprovider system
107

108 109
	IpnsFs *ipnsfs.Filesystem

110 111
	proc goprocess.Process
	ctx  context.Context
112

113
	mode mode
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115
}

116 117 118 119 120 121 122 123
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
	Ipfs mount.Mount
	Ipns mount.Mount
}

Jeromy's avatar
Jeromy committed
124
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error {
125 126

	if n.PeerHost != nil { // already online.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127
		return errors.New("node already online")
128 129 130
	}

	// load private key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131
	if err := n.LoadPrivateKey(); err != nil {
132 133 134
		return err
	}

Jeromy's avatar
Jeromy committed
135 136 137
	// Set reporter
	n.Reporter = metrics.NewBandwidthCounter()

Jeromy's avatar
Jeromy committed
138
	// get undialable addrs from config
139 140 141 142
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
143
	var addrfilter []*net.IPNet
144
	for _, s := range cfg.Swarm.AddrFilters {
Jeromy's avatar
Jeromy committed
145 146 147 148 149 150 151 152
		f, err := mamask.NewMask(s)
		if err != nil {
			return fmt.Errorf("incorrectly formatter address filter in config: %s", s)
		}
		addrfilter = append(addrfilter, f)
	}

	peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter)
153
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
		return err
155 156
	}

157
	if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption); err != nil {
158
		return err
159 160 161
	}

	// Ok, now we're ready to listen.
162
	if err := startListening(ctx, n.PeerHost, cfg); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
		return err
164
	}
165

166
	n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
167
	go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
168

169
	// setup local discovery
Jeromy's avatar
Jeromy committed
170 171 172
	if do != nil {
		service, err := do(n.PeerHost)
		if err != nil {
Jeromy's avatar
Jeromy committed
173 174 175 176
			log.Error("mdns error: ", err)
		} else {
			service.RegisterNotifee(n)
			n.Discovery = service
Jeromy's avatar
Jeromy committed
177
		}
178 179
	}

180
	return n.Bootstrap(DefaultBootstrapConfig)
181 182
}

Jeromy's avatar
Jeromy committed
183 184 185 186 187 188 189 190 191 192 193 194
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
	if d.MDNS.Enabled {
		return func(h p2phost.Host) (discovery.Service, error) {
			if d.MDNS.Interval == 0 {
				d.MDNS.Interval = 5
			}
			return discovery.NewMdnsService(h, time.Duration(d.MDNS.Interval)*time.Second)
		}
	}
	return nil
}

195 196
func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) {
	log.Warning("trying peer info: ", p)
197
	ctx, cancel := context.WithTimeout(n.Context(), discoveryConnTimeout)
rht's avatar
rht committed
198
	defer cancel()
199
	if err := n.PeerHost.Connect(ctx, p); err != nil {
200 201 202 203
		log.Warning("Failed to connect to peer found by discovery: ", err)
	}
}

204 205
// startOnlineServicesWithHost  is the set of services which need to be
// initialized with the host and _before_ we start listening.
206
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error {
207
	// setup diagnostics service
208
	n.Diagnostics = diag.NewDiagnostics(n.Identity, host)
Jeromy's avatar
Jeromy committed
209
	n.Ping = ping.NewPingService(host)
210 211

	// setup routing service
212
	r, err := routingOption(ctx, host, n.Repo.Datastore())
Jeromy's avatar
Jeromy committed
213
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
		return err
215
	}
Jeromy's avatar
Jeromy committed
216
	n.Routing = r
217

218 219 220
	// Wrap standard peer host with routing system to allow unknown peer lookups
	n.PeerHost = rhost.Wrap(host, n.Routing)

221 222
	// setup exchange service
	const alwaysSendToPeer = true // use YesManStrategy
223
	bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
224 225 226 227
	n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)

	// setup name system
	n.Namesys = namesys.NewNameSystem(n.Routing)
228

229 230 231
	return nil
}

232 233 234 235 236 237 238 239 240 241 242 243
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
	return n.proc
}

// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
	return n.proc.Close()
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
244 245 246
	if n.ctx == nil {
		n.ctx = context.TODO()
	}
247 248 249
	return n.ctx
}

250 251
// teardown closes owned children. If any errors occur, this function returns
// the first error.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
252
func (n *IpfsNode) teardown() error {
253
	log.Debug("core is shutting down...")
254 255
	// owned objects are closed in this teardown to ensure that they're closed
	// regardless of which constructor was used to add them to the node.
Jeromy's avatar
Jeromy committed
256 257 258
	closers := []io.Closer{
		n.Repo,
	}
259

260 261 262 263
	if n.Exchange != nil {
		closers = append(closers, n.Exchange)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264 265 266 267 268 269 270
	if n.Mounts.Ipfs != nil {
		closers = append(closers, mount.Closer(n.Mounts.Ipfs))
	}
	if n.Mounts.Ipns != nil {
		closers = append(closers, mount.Closer(n.Mounts.Ipns))
	}

Jeromy's avatar
Jeromy committed
271 272
	// Filesystem needs to be closed before network, dht, and blockservice
	// so it can use them as its shutting down
273
	if n.IpnsFs != nil {
Jeromy's avatar
Jeromy committed
274 275 276
		closers = append(closers, n.IpnsFs)
	}

Jeromy's avatar
Jeromy committed
277 278 279 280
	if n.Blocks != nil {
		closers = append(closers, n.Blocks)
	}

Jeromy's avatar
Jeromy committed
281 282
	if n.Bootstrapper != nil {
		closers = append(closers, n.Bootstrapper)
283 284
	}

285
	if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
286
		closers = append(closers, dht.Process())
Jeromy's avatar
Jeromy committed
287 288 289 290
	}

	if n.PeerHost != nil {
		closers = append(closers, n.PeerHost)
291
	}
292

293
	var errs []error
294
	for _, closer := range closers {
295 296
		if err := closer.Close(); err != nil {
			errs = append(errs, err)
297 298 299 300
		}
	}
	if len(errs) > 0 {
		return errs[0]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
301 302
	}
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
303 304
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
305
func (n *IpfsNode) OnlineMode() bool {
306 307 308 309 310 311
	switch n.mode {
	case onlineMode:
		return true
	default:
		return false
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
312 313
}

314
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
315 316

	// TODO what should return value be when in offlineMode?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
317 318 319 320
	if n.Routing == nil {
		return nil
	}

321 322 323 324 325 326 327 328
	if n.Bootstrapper != nil {
		n.Bootstrapper.Close() // stop previous bootstrap process.
	}

	// if the caller did not specify a bootstrap peer function, get the
	// freshest bootstrap peers from config. this responds to live changes.
	if cfg.BootstrapPeers == nil {
		cfg.BootstrapPeers = func() []peer.PeerInfo {
329
			ps, err := n.loadBootstrapPeers()
330
			if err != nil {
331
				log.Warning("failed to parse bootstrap peers from config")
332 333 334 335 336 337 338 339 340
				return nil
			}
			return ps
		}
	}

	var err error
	n.Bootstrapper, err = Bootstrap(n, cfg)
	return err
341 342
}

343 344
func (n *IpfsNode) loadID() error {
	if n.Identity != "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
345
		return errors.New("identity already loaded")
346 347
	}

348 349 350 351 352 353
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	cid := cfg.Identity.PeerID
354
	if cid == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
355
		return errors.New("Identity was not set in config (was ipfs init run?)")
356 357
	}
	if len(cid) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
358
		return errors.New("No peer ID in config! (was ipfs init run?)")
359 360
	}

361 362 363
	n.Identity = peer.ID(b58.Decode(cid))
	return nil
}
364

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
365
func (n *IpfsNode) LoadPrivateKey() error {
366
	if n.Identity == "" || n.Peerstore == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
367
		return errors.New("loaded private key out of order.")
368 369
	}

370
	if n.PrivateKey != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
371
		return errors.New("private key already loaded")
372 373
	}

374 375 376 377 378 379
	cfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	sk, err := loadPrivateKey(&cfg.Identity, n.Identity)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380
	if err != nil {
381
		return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382
	}
383

384 385
	n.PrivateKey = sk
	n.Peerstore.AddPrivKey(n.Identity, n.PrivateKey)
Jeromy's avatar
Jeromy committed
386 387 388 389
	n.Peerstore.AddPubKey(n.Identity, sk.GetPublic())
	return nil
}

390
func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) {
391 392 393 394 395 396
	cfg, err := n.Repo.Config()
	if err != nil {
		return nil, err
	}

	parsed, err := cfg.BootstrapPeers()
397 398 399 400 401 402
	if err != nil {
		return nil, err
	}
	return toPeerInfos(parsed), nil
}

Jeromy's avatar
Jeromy committed
403 404 405
// SetupOfflineRouting loads the local nodes private key and
// uses it to instantiate a routing system in offline mode.
// This is primarily used for offline ipns modifications.
Jeromy's avatar
Jeromy committed
406
func (n *IpfsNode) SetupOfflineRouting() error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
407
	err := n.LoadPrivateKey()
Jeromy's avatar
Jeromy committed
408 409 410 411 412
	if err != nil {
		return err
	}

	n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey)
413 414 415

	n.Namesys = namesys.NewNameSystem(n.Routing)

416
	return nil
417 418 419 420
}

func loadPrivateKey(cfg *config.Identity, id peer.ID) (ic.PrivKey, error) {
	sk, err := cfg.DecodePrivateKey("passphrase todo!")
421 422 423
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
424

425 426 427 428
	id2, err := peer.IDFromPrivateKey(sk)
	if err != nil {
		return nil, err
	}
429

430 431
	if id2 != id {
		return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
432 433
	}

434
	return sk, nil
435
}
436

437
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438 439 440
	var listen []ma.Multiaddr
	for _, addr := range cfg.Addresses.Swarm {
		maddr, err := ma.NewMultiaddr(addr)
441
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
442
			return nil, fmt.Errorf("Failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
443
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
444
		listen = append(listen, maddr)
445 446 447 448
	}

	return listen, nil
}
449

Jeromy's avatar
Jeromy committed
450
type HostOption func(ctx context.Context, id peer.ID, ps peer.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error)
Jeromy's avatar
Jeromy committed
451 452 453

var DefaultHostOption HostOption = constructPeerHost

454
// isolates the complex initialization steps
Jeromy's avatar
Jeromy committed
455
func constructPeerHost(ctx context.Context, id peer.ID, ps peer.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) {
456 457

	// no addresses to begin with. we'll start later.
Jeromy's avatar
Jeromy committed
458
	network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr)
459
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
460
		return nil, err
461
	}
462

463 464 465 466
	for _, f := range fs {
		network.Swarm().Filters.AddDialFilter(f)
	}

Jeromy's avatar
Jeromy committed
467 468
	host := p2pbhost.New(network, p2pbhost.NATPortMap, bwr)

469 470 471 472 473 474 475
	return host, nil
}

// startListening on the network addresses
func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) error {
	listenAddrs, err := listenAddresses(cfg)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
476
		return err
477 478
	}

479
	// make sure we error out if our config does not have addresses we can use
480
	log.Debugf("Config.Addresses.Swarm:%s", listenAddrs)
481
	filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
482
	log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs)
483
	if len(filteredAddrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
484
		return fmt.Errorf("addresses in config not usable: %s", listenAddrs)
485 486
	}

487 488 489
	// Actually start listening:
	if err := host.Network().Listen(filteredAddrs...); err != nil {
		return err
490 491
	}

492
	// list out our addresses
493
	addrs, err := host.Network().InterfaceListenAddresses()
494
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
495
		return err
496
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497
	log.Infof("Swarm listening at: %s", addrs)
498
	return nil
499
}
500

501 502
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
	dhtRouting := dht.NewDHT(ctx, host, dstore)
503
	dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator
504 505
	return dhtRouting, nil
}
Jeromy's avatar
Jeromy committed
506

507
type RoutingOption func(context.Context, p2phost.Host, ds.ThreadSafeDatastore) (routing.IpfsRouting, error)
Jeromy's avatar
Jeromy committed
508

Jeromy's avatar
Jeromy committed
509 510
type DiscoveryOption func(p2phost.Host) (discovery.Service, error)

511
var DHTOption RoutingOption = constructDHTRouting
512
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting