core.go 13.6 KB
Newer Older
1 2 3 4
/*
Package core implements the IpfsNode object and related methods.

Packages underneath core/ provide a (relatively) stable, low-level API
5 6 7 8
to carry out most IPFS-related tasks.  For more details on the other
interfaces and how core/... fits into the bigger IPFS picture, see:

  $ godoc github.com/ipfs/go-ipfs
9
*/
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10 11
package core

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	"errors"
Juan Batiz-Benet's avatar
go fmt  
Juan Batiz-Benet committed
14
	"fmt"
15
	"io"
Jeromy's avatar
Jeromy committed
16
	"net"
17
	"time"
18

19 20 21
	b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
	ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
	ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
22
	goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
Jeromy's avatar
Jeromy committed
23
	mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter"
24 25
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	diag "github.com/ipfs/go-ipfs/diagnostics"
Jeromy's avatar
Jeromy committed
26
	metrics "github.com/ipfs/go-ipfs/metrics"
27
	ic "github.com/ipfs/go-ipfs/p2p/crypto"
28
	discovery "github.com/ipfs/go-ipfs/p2p/discovery"
29 30 31 32 33 34
	p2phost "github.com/ipfs/go-ipfs/p2p/host"
	p2pbhost "github.com/ipfs/go-ipfs/p2p/host/basic"
	rhost "github.com/ipfs/go-ipfs/p2p/host/routed"
	swarm "github.com/ipfs/go-ipfs/p2p/net/swarm"
	addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
Jeromy's avatar
Jeromy committed
35
	ping "github.com/ipfs/go-ipfs/p2p/protocol/ping"
Jeromy's avatar
Jeromy committed
36
	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
37

38 39
	routing "github.com/ipfs/go-ipfs/routing"
	dht "github.com/ipfs/go-ipfs/routing/dht"
40
	nilrouting "github.com/ipfs/go-ipfs/routing/none"
41
	offroute "github.com/ipfs/go-ipfs/routing/offline"
42

43 44 45 46 47 48
	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	bserv "github.com/ipfs/go-ipfs/blockservice"
	exchange "github.com/ipfs/go-ipfs/exchange"
	bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
	bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
	rp "github.com/ipfs/go-ipfs/exchange/reprovide"
49

50 51 52 53 54 55 56 57
	mount "github.com/ipfs/go-ipfs/fuse/mount"
	ipnsfs "github.com/ipfs/go-ipfs/ipnsfs"
	merkledag "github.com/ipfs/go-ipfs/merkledag"
	namesys "github.com/ipfs/go-ipfs/namesys"
	path "github.com/ipfs/go-ipfs/path"
	pin "github.com/ipfs/go-ipfs/pin"
	repo "github.com/ipfs/go-ipfs/repo"
	config "github.com/ipfs/go-ipfs/repo/config"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59
)

Jeromy's avatar
Jeromy committed
60
const IpnsValidatorTag = "ipns"
61
const kSizeBlockstoreWriteCache = 100
62
const kReprovideFrequency = time.Hour * 12
63
const discoveryConnTimeout = time.Second * 30
Jeromy's avatar
Jeromy committed
64

Brian Tiger Chow's avatar
Brian Tiger Chow committed
65
var log = eventlog.Logger("core")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66

67 68 69 70 71 72 73 74 75
type mode int

const (
	// zero value is not a valid mode, must be explicitly set
	invalidMode mode = iota
	offlineMode
	onlineMode
)

Juan Batiz-Benet's avatar
go lint  
Juan Batiz-Benet committed
76
// IpfsNode is IPFS Core module. It represents an IPFS instance.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77 78
type IpfsNode struct {

79
	// Self
80
	Identity peer.ID // the local node's identity
81

82
	Repo repo.Repo
83 84

	// Local node
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85 86 87
	Pinning    pin.Pinner // the pinning manager
	Mounts     Mounts     // current mount state, if any.
	PrivateKey ic.PrivKey // the local node's private Key
88 89

	// Services
90 91 92 93 94
	Peerstore  peer.Peerstore       // storage for other Peer instances
	Blockstore bstore.Blockstore    // the block store (lower level)
	Blocks     *bserv.BlockService  // the block service, get/add blocks.
	DAG        merkledag.DAGService // the merkle dag service, get/add objects.
	Resolver   *path.Resolver       // the path resolution system
Jeromy's avatar
Jeromy committed
95
	Reporter   metrics.Reporter
96
	Discovery  discovery.Service
97 98

	// Online
99 100 101 102 103 104
	PeerHost     p2phost.Host        // the network host (server+client)
	Bootstrapper io.Closer           // the periodic bootstrapper
	Routing      routing.IpfsRouting // the routing system. recommend ipfs-dht
	Exchange     exchange.Interface  // the block exchange + strategy (bitswap)
	Namesys      namesys.NameSystem  // the name system, resolves paths to hashes
	Diagnostics  *diag.Diagnostics   // the diagnostics service
Jeromy's avatar
Jeromy committed
105 106
	Ping         *ping.PingService
	Reprovider   *rp.Reprovider // the value reprovider system
107

108 109
	IpnsFs *ipnsfs.Filesystem

110 111
	proc goprocess.Process
	ctx  context.Context
112

113
	mode mode
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115
}

116 117 118 119 120 121 122 123
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
	Ipfs mount.Mount
	Ipns mount.Mount
}

Jeromy's avatar
Jeromy committed
124
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error {
125 126

	if n.PeerHost != nil { // already online.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127
		return errors.New("node already online")
128 129 130
	}

	// load private key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131
	if err := n.LoadPrivateKey(); err != nil {
132 133 134
		return err
	}

Jeromy's avatar
Jeromy committed
135 136 137
	// Set reporter
	n.Reporter = metrics.NewBandwidthCounter()

Jeromy's avatar
Jeromy committed
138 139 140
	// get undialable addrs from config
	cfg := n.Repo.Config()
	var addrfilter []*net.IPNet
141
	for _, s := range cfg.Swarm.AddrFilters {
Jeromy's avatar
Jeromy committed
142 143 144 145 146 147 148 149
		f, err := mamask.NewMask(s)
		if err != nil {
			return fmt.Errorf("incorrectly formatter address filter in config: %s", s)
		}
		addrfilter = append(addrfilter, f)
	}

	peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter)
150
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151
		return err
152 153
	}

154
	if err := n.startOnlineServicesWithHost(ctx, peerhost, routingOption); err != nil {
155
		return err
156 157 158 159
	}

	// Ok, now we're ready to listen.
	if err := startListening(ctx, n.PeerHost, n.Repo.Config()); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
		return err
161
	}
162

163
	n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
164
	go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
165

166
	// setup local discovery
Jeromy's avatar
Jeromy committed
167 168 169
	if do != nil {
		service, err := do(n.PeerHost)
		if err != nil {
Jeromy's avatar
Jeromy committed
170 171 172 173
			log.Error("mdns error: ", err)
		} else {
			service.RegisterNotifee(n)
			n.Discovery = service
Jeromy's avatar
Jeromy committed
174
		}
175 176
	}

177
	return n.Bootstrap(DefaultBootstrapConfig)
178 179
}

Jeromy's avatar
Jeromy committed
180 181 182 183 184 185 186 187 188 189 190 191
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
	if d.MDNS.Enabled {
		return func(h p2phost.Host) (discovery.Service, error) {
			if d.MDNS.Interval == 0 {
				d.MDNS.Interval = 5
			}
			return discovery.NewMdnsService(h, time.Duration(d.MDNS.Interval)*time.Second)
		}
	}
	return nil
}

192 193
func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) {
	log.Warning("trying peer info: ", p)
194
	ctx, cancel := context.WithTimeout(n.Context(), discoveryConnTimeout)
rht's avatar
rht committed
195
	defer cancel()
196
	if err := n.PeerHost.Connect(ctx, p); err != nil {
197 198 199 200
		log.Warning("Failed to connect to peer found by discovery: ", err)
	}
}

201 202
// startOnlineServicesWithHost  is the set of services which need to be
// initialized with the host and _before_ we start listening.
203
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error {
204
	// setup diagnostics service
205
	n.Diagnostics = diag.NewDiagnostics(n.Identity, host)
Jeromy's avatar
Jeromy committed
206
	n.Ping = ping.NewPingService(host)
207 208

	// setup routing service
209
	r, err := routingOption(ctx, host, n.Repo.Datastore())
Jeromy's avatar
Jeromy committed
210
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
211
		return err
212
	}
Jeromy's avatar
Jeromy committed
213
	n.Routing = r
214

215 216 217
	// Wrap standard peer host with routing system to allow unknown peer lookups
	n.PeerHost = rhost.Wrap(host, n.Routing)

218 219
	// setup exchange service
	const alwaysSendToPeer = true // use YesManStrategy
220
	bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
221 222 223 224
	n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer)

	// setup name system
	n.Namesys = namesys.NewNameSystem(n.Routing)
225

226 227 228
	return nil
}

229 230 231 232 233 234 235 236 237 238 239 240
// Process returns the Process object
func (n *IpfsNode) Process() goprocess.Process {
	return n.proc
}

// Close calls Close() on the Process object
func (n *IpfsNode) Close() error {
	return n.proc.Close()
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
241 242 243
	if n.ctx == nil {
		n.ctx = context.TODO()
	}
244 245 246
	return n.ctx
}

247 248
// teardown closes owned children. If any errors occur, this function returns
// the first error.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
249
func (n *IpfsNode) teardown() error {
250
	log.Debug("core is shutting down...")
251 252
	// owned objects are closed in this teardown to ensure that they're closed
	// regardless of which constructor was used to add them to the node.
Jeromy's avatar
Jeromy committed
253 254 255
	closers := []io.Closer{
		n.Repo,
	}
256

257 258 259 260
	if n.Exchange != nil {
		closers = append(closers, n.Exchange)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261 262 263 264 265 266 267
	if n.Mounts.Ipfs != nil {
		closers = append(closers, mount.Closer(n.Mounts.Ipfs))
	}
	if n.Mounts.Ipns != nil {
		closers = append(closers, mount.Closer(n.Mounts.Ipns))
	}

Jeromy's avatar
Jeromy committed
268 269
	// Filesystem needs to be closed before network, dht, and blockservice
	// so it can use them as its shutting down
270
	if n.IpnsFs != nil {
Jeromy's avatar
Jeromy committed
271 272 273
		closers = append(closers, n.IpnsFs)
	}

Jeromy's avatar
Jeromy committed
274 275 276 277
	if n.Blocks != nil {
		closers = append(closers, n.Blocks)
	}

Jeromy's avatar
Jeromy committed
278 279
	if n.Bootstrapper != nil {
		closers = append(closers, n.Bootstrapper)
280 281
	}

282
	if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
283
		closers = append(closers, dht.Process())
Jeromy's avatar
Jeromy committed
284 285 286 287
	}

	if n.PeerHost != nil {
		closers = append(closers, n.PeerHost)
288
	}
289

290
	var errs []error
291
	for _, closer := range closers {
292 293
		if err := closer.Close(); err != nil {
			errs = append(errs, err)
294 295 296 297
		}
	}
	if len(errs) > 0 {
		return errs[0]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
298 299
	}
	return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
300 301
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
302
func (n *IpfsNode) OnlineMode() bool {
303 304 305 306 307 308
	switch n.mode {
	case onlineMode:
		return true
	default:
		return false
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
309 310
}

311
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
312 313

	// TODO what should return value be when in offlineMode?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314 315 316 317
	if n.Routing == nil {
		return nil
	}

318 319 320 321 322 323 324 325
	if n.Bootstrapper != nil {
		n.Bootstrapper.Close() // stop previous bootstrap process.
	}

	// if the caller did not specify a bootstrap peer function, get the
	// freshest bootstrap peers from config. this responds to live changes.
	if cfg.BootstrapPeers == nil {
		cfg.BootstrapPeers = func() []peer.PeerInfo {
326
			ps, err := n.loadBootstrapPeers()
327
			if err != nil {
328
				log.Warningf("failed to parse bootstrap peers from config: %s", n.Repo.Config().Bootstrap)
329 330 331 332 333 334 335 336 337
				return nil
			}
			return ps
		}
	}

	var err error
	n.Bootstrapper, err = Bootstrap(n, cfg)
	return err
338 339
}

340 341
func (n *IpfsNode) loadID() error {
	if n.Identity != "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
342
		return errors.New("identity already loaded")
343 344
	}

345
	cid := n.Repo.Config().Identity.PeerID
346
	if cid == "" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
347
		return errors.New("Identity was not set in config (was ipfs init run?)")
348 349
	}
	if len(cid) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350
		return errors.New("No peer ID in config! (was ipfs init run?)")
351 352
	}

353 354 355
	n.Identity = peer.ID(b58.Decode(cid))
	return nil
}
356

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357
func (n *IpfsNode) LoadPrivateKey() error {
358
	if n.Identity == "" || n.Peerstore == nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
359
		return errors.New("loaded private key out of order.")
360 361
	}

362
	if n.PrivateKey != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
363
		return errors.New("private key already loaded")
364 365
	}

366
	sk, err := loadPrivateKey(&n.Repo.Config().Identity, n.Identity)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
367
	if err != nil {
368
		return err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369
	}
370

371 372
	n.PrivateKey = sk
	n.Peerstore.AddPrivKey(n.Identity, n.PrivateKey)
Jeromy's avatar
Jeromy committed
373 374 375 376
	n.Peerstore.AddPubKey(n.Identity, sk.GetPublic())
	return nil
}

377 378 379 380 381 382 383 384
func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) {
	parsed, err := n.Repo.Config().BootstrapPeers()
	if err != nil {
		return nil, err
	}
	return toPeerInfos(parsed), nil
}

Jeromy's avatar
Jeromy committed
385 386 387
// SetupOfflineRouting loads the local nodes private key and
// uses it to instantiate a routing system in offline mode.
// This is primarily used for offline ipns modifications.
Jeromy's avatar
Jeromy committed
388
func (n *IpfsNode) SetupOfflineRouting() error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
389
	err := n.LoadPrivateKey()
Jeromy's avatar
Jeromy committed
390 391 392 393 394
	if err != nil {
		return err
	}

	n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey)
395 396 397

	n.Namesys = namesys.NewNameSystem(n.Routing)

398
	return nil
399 400 401 402
}

func loadPrivateKey(cfg *config.Identity, id peer.ID) (ic.PrivKey, error) {
	sk, err := cfg.DecodePrivateKey("passphrase todo!")
403 404 405
	if err != nil {
		return nil, err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
406

407 408 409 410
	id2, err := peer.IDFromPrivateKey(sk)
	if err != nil {
		return nil, err
	}
411

412 413
	if id2 != id {
		return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2)
414 415
	}

416
	return sk, nil
417
}
418

419
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
420 421 422
	var listen []ma.Multiaddr
	for _, addr := range cfg.Addresses.Swarm {
		maddr, err := ma.NewMultiaddr(addr)
423
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
424
			return nil, fmt.Errorf("Failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm)
425
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
426
		listen = append(listen, maddr)
427 428 429 430
	}

	return listen, nil
}
431

Jeromy's avatar
Jeromy committed
432
type HostOption func(ctx context.Context, id peer.ID, ps peer.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error)
Jeromy's avatar
Jeromy committed
433 434 435

var DefaultHostOption HostOption = constructPeerHost

436
// isolates the complex initialization steps
Jeromy's avatar
Jeromy committed
437
func constructPeerHost(ctx context.Context, id peer.ID, ps peer.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) {
438 439

	// no addresses to begin with. we'll start later.
Jeromy's avatar
Jeromy committed
440
	network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr)
441
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
442
		return nil, err
443
	}
444

445 446 447 448
	for _, f := range fs {
		network.Swarm().Filters.AddDialFilter(f)
	}

Jeromy's avatar
Jeromy committed
449 450
	host := p2pbhost.New(network, p2pbhost.NATPortMap, bwr)

451 452 453 454 455 456 457
	return host, nil
}

// startListening on the network addresses
func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) error {
	listenAddrs, err := listenAddresses(cfg)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
458
		return err
459 460
	}

461
	// make sure we error out if our config does not have addresses we can use
462
	log.Debugf("Config.Addresses.Swarm:%s", listenAddrs)
463
	filteredAddrs := addrutil.FilterUsableAddrs(listenAddrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
464
	log.Debugf("Config.Addresses.Swarm:%s (filtered)", filteredAddrs)
465
	if len(filteredAddrs) < 1 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
466
		return fmt.Errorf("addresses in config not usable: %s", listenAddrs)
467 468
	}

469 470 471
	// Actually start listening:
	if err := host.Network().Listen(filteredAddrs...); err != nil {
		return err
472 473
	}

474
	// list out our addresses
475
	addrs, err := host.Network().InterfaceListenAddresses()
476
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
477
		return err
478
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
479
	log.Infof("Swarm listening at: %s", addrs)
480
	return nil
481
}
482

483 484
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
	dhtRouting := dht.NewDHT(ctx, host, dstore)
485
	dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator
486 487
	return dhtRouting, nil
}
Jeromy's avatar
Jeromy committed
488

489
type RoutingOption func(context.Context, p2phost.Host, ds.ThreadSafeDatastore) (routing.IpfsRouting, error)
Jeromy's avatar
Jeromy committed
490

Jeromy's avatar
Jeromy committed
491 492
type DiscoveryOption func(p2phost.Host) (discovery.Service, error)

493
var DHTOption RoutingOption = constructDHTRouting
494
var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting