builder.go 9.21 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package core

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6
	"crypto/rand"
	"encoding/base64"
Jeromy's avatar
Jeromy committed
7
	"errors"
8 9 10
	"os"
	"syscall"
	"time"
Jeromy's avatar
Jeromy committed
11

12 13 14 15 16
	"go.uber.org/fx"

	"github.com/ipfs/go-ipfs/p2p"
	"github.com/ipfs/go-ipfs/provider"

17
	filestore "github.com/ipfs/go-ipfs/filestore"
18
	namesys "github.com/ipfs/go-ipfs/namesys"
19
	pin "github.com/ipfs/go-ipfs/pin"
20
	repo "github.com/ipfs/go-ipfs/repo"
21
	cidv0v1 "github.com/ipfs/go-ipfs/thirdparty/cidv0v1"
Jakub Sztandera's avatar
Jakub Sztandera committed
22
	"github.com/ipfs/go-ipfs/thirdparty/verifbs"
Steven Allen's avatar
Steven Allen committed
23

Jakub Sztandera's avatar
Jakub Sztandera committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
	bserv "github.com/ipfs/go-blockservice"
	ds "github.com/ipfs/go-datastore"
	retry "github.com/ipfs/go-datastore/retrystore"
	dsync "github.com/ipfs/go-datastore/sync"
	bstore "github.com/ipfs/go-ipfs-blockstore"
	cfg "github.com/ipfs/go-ipfs-config"
	offline "github.com/ipfs/go-ipfs-exchange-offline"
	offroute "github.com/ipfs/go-ipfs-routing/offline"
	dag "github.com/ipfs/go-merkledag"
	metrics "github.com/ipfs/go-metrics-interface"
	resolver "github.com/ipfs/go-path/resolver"
	uio "github.com/ipfs/go-unixfs/io"
	libp2p "github.com/libp2p/go-libp2p"
	ci "github.com/libp2p/go-libp2p-crypto"
	p2phost "github.com/libp2p/go-libp2p-host"
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
Łukasz Magiera's avatar
Łukasz Magiera committed
41
)
Jeromy's avatar
Jeromy committed
42

43 44 45
type BuildCfg struct {
	// If online is set, the node will have networking enabled
	Online bool
Jeromy's avatar
Jeromy committed
46

Jeromy's avatar
Jeromy committed
47 48 49
	// ExtraOpts is a map of extra options used to configure the ipfs nodes creation
	ExtraOpts map[string]bool

50
	// If permanent then node should run more expensive processes
51
	// that will improve performance in long run
52
	Permanent bool
53

Steven Allen's avatar
Steven Allen committed
54 55 56 57
	// DisableEncryptedConnections disables connection encryption *entirely*.
	// DO NOT SET THIS UNLESS YOU'RE TESTING.
	DisableEncryptedConnections bool

58
	// If NilRepo is set, a Repo backed by a nil datastore will be constructed
59 60 61 62 63
	NilRepo bool

	Routing RoutingOption
	Host    HostOption
	Repo    repo.Repo
Jeromy's avatar
Jeromy committed
64 65
}

Jeromy's avatar
Jeromy committed
66 67 68 69 70 71 72 73
func (cfg *BuildCfg) getOpt(key string) bool {
	if cfg.ExtraOpts == nil {
		return false
	}

	return cfg.ExtraOpts[key]
}

74 75
func (cfg *BuildCfg) fillDefaults() error {
	if cfg.Repo != nil && cfg.NilRepo {
76
		return errors.New("cannot set a Repo and specify nilrepo at the same time")
Jeromy's avatar
Jeromy committed
77
	}
78 79 80 81 82

	if cfg.Repo == nil {
		var d ds.Datastore
		if cfg.NilRepo {
			d = ds.NewNullDatastore()
83 84
		} else {
			d = ds.NewMapDatastore()
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
		}
		r, err := defaultRepo(dsync.MutexWrap(d))
		if err != nil {
			return err
		}
		cfg.Repo = r
	}

	if cfg.Routing == nil {
		cfg.Routing = DHTOption
	}

	if cfg.Host == nil {
		cfg.Host = DefaultHostOption
	}

	return nil
Jeromy's avatar
Jeromy committed
102 103
}

104
func defaultRepo(dstore repo.Datastore) (repo.Repo, error) {
105 106 107 108 109 110
	c := cfg.Config{}
	priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, rand.Reader)
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
111
	pid, err := peer.IDFromPublicKey(pub)
112 113 114 115 116 117 118 119 120 121 122
	if err != nil {
		return nil, err
	}

	privkeyb, err := priv.Bytes()
	if err != nil {
		return nil, err
	}

	c.Bootstrap = cfg.DefaultBootstrapAddresses
	c.Addresses.Swarm = []string{"/ip4/0.0.0.0/tcp/4001"}
Jeromy's avatar
Jeromy committed
123
	c.Identity.PeerID = pid.Pretty()
124 125
	c.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb)

Jeromy's avatar
Jeromy committed
126
	return &repo.Mock{
Jeromy's avatar
Jeromy committed
127
		D: dstore,
128 129
		C: c,
	}, nil
Jeromy's avatar
Jeromy committed
130 131
}

132 133
type MetricsCtx context.Context

134
// NewNode constructs and returns an IpfsNode using the given cfg.
135 136 137 138
func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
	if cfg == nil {
		cfg = new(BuildCfg)
	}
Jeromy's avatar
Jeromy committed
139

140 141 142 143
	err := cfg.fillDefaults()
	if err != nil {
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
144

Jakub Sztandera's avatar
Jakub Sztandera committed
145
	ctx = metrics.CtxScope(ctx, "ipfs")
Jeromy's avatar
Jeromy committed
146

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
	repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo {
		lc.Append(fx.Hook{
			OnStop: func(ctx context.Context) error {
				return cfg.Repo.Close()
			},
		})

		return cfg.Repo
	})

	// TODO: Remove this, use only for passing node config
	cfgOption := fx.Provide(func() *BuildCfg {
		return cfg
	})

162 163 164 165
	metricsCtx := fx.Provide(func() MetricsCtx {
		return MetricsCtx(ctx)
	})

Łukasz Magiera's avatar
Łukasz Magiera committed
166
	params := fx.Options(
167 168
		repoOption,
		cfgOption,
169
		metricsCtx,
Łukasz Magiera's avatar
Łukasz Magiera committed
170
	)
171

Łukasz Magiera's avatar
Łukasz Magiera committed
172
	storage := fx.Options(
173
		fx.Provide(repoConfig),
Łukasz Magiera's avatar
Łukasz Magiera committed
174
		fx.Provide(datastoreCtor),
175 176
		fx.Provide(baseBlockstoreCtor),
		fx.Provide(gcBlockstoreCtor),
Łukasz Magiera's avatar
Łukasz Magiera committed
177
	)
178

Łukasz Magiera's avatar
Łukasz Magiera committed
179 180 181
	ident := fx.Options(
		fx.Provide(identity),
		fx.Provide(privateKey),
Łukasz Magiera's avatar
Łukasz Magiera committed
182
		fx.Provide(peerstore),
Łukasz Magiera's avatar
Łukasz Magiera committed
183 184 185
	)

	ipns := fx.Options(
186
		fx.Provide(recordValidator),
Łukasz Magiera's avatar
Łukasz Magiera committed
187
	)
188

Łukasz Magiera's avatar
Łukasz Magiera committed
189 190 191 192 193 194 195 196 197
	providers := fx.Options(
		fx.Provide(providerQueue),
		fx.Provide(providerCtor),
		fx.Provide(reproviderCtor),

		fx.Invoke(reprovider),
		fx.Invoke(provider.Provider.Run),
	)

Łukasz Magiera's avatar
Łukasz Magiera committed
198 199 200
	online := fx.Options(
		fx.Provide(onlineExchangeCtor),
		fx.Provide(onlineNamesysCtor),
201

Łukasz Magiera's avatar
Łukasz Magiera committed
202
		fx.Invoke(ipnsRepublisher),
Łukasz Magiera's avatar
Łukasz Magiera committed
203 204 205 206 207

		fx.Provide(p2p.NewP2P),

		ipfsp2p,
		providers,
Łukasz Magiera's avatar
Łukasz Magiera committed
208 209 210 211 212
	)
	if !cfg.Online {
		online = fx.Options(
			fx.Provide(offline.Exchange),
			fx.Provide(offlineNamesysCtor),
Łukasz Magiera's avatar
Łukasz Magiera committed
213 214
			fx.Provide(offroute.NewOfflineRouter),
			fx.Provide(provider.NewOfflineProvider),
Łukasz Magiera's avatar
Łukasz Magiera committed
215 216
		)
	}
217

Łukasz Magiera's avatar
Łukasz Magiera committed
218
	core := fx.Options(
219
		fx.Provide(blockServiceCtor),
Łukasz Magiera's avatar
Łukasz Magiera committed
220
		fx.Provide(dagCtor),
221 222 223
		fx.Provide(resolver.NewBasicResolver),
		fx.Provide(pinning),
		fx.Provide(files),
Łukasz Magiera's avatar
Łukasz Magiera committed
224
	)
225

Łukasz Magiera's avatar
Łukasz Magiera committed
226 227 228
	n := &IpfsNode{
		ctx: ctx,
	}
229

Łukasz Magiera's avatar
Łukasz Magiera committed
230
	app := fx.New(
231 232
		fx.Provide(baseProcess),

Łukasz Magiera's avatar
Łukasz Magiera committed
233 234 235 236 237 238 239
		params,
		storage,
		ident,
		ipns,
		online,

		fx.Invoke(setupSharding),
240
		fx.NopLogger,
Łukasz Magiera's avatar
Łukasz Magiera committed
241 242

		core,
243 244 245 246

		fx.Extract(n),
	)

247 248 249 250 251
	go func() {
		<-ctx.Done()
		app.Stop(context.Background())
	}()

252
	n.IsOnline = cfg.Online
Łukasz Magiera's avatar
Łukasz Magiera committed
253
	n.app = app
254

Łukasz Magiera's avatar
Łukasz Magiera committed
255 256 257 258 259 260
	/*	n := &IpfsNode{
			IsOnline:  cfg.Online,
			Repo:      cfg.Repo,
			ctx:       ctx,
			Peerstore: pstoremem.NewPeerstore(),
		}
261

Łukasz Magiera's avatar
Łukasz Magiera committed
262 263 264 265 266
		n.RecordValidator = record.NamespacedValidator{
			"pk":   record.PublicKeyValidator{},
			"ipns": ipns.Validator{KeyBook: n.Peerstore},
		}
	*/
267 268
	// TODO: port to lifetimes
	// n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown)
269

270
	/*if err := setupNode(ctx, n, cfg); err != nil {
271 272
		n.Close()
		return nil, err
273
	}*/
Łukasz Magiera's avatar
Łukasz Magiera committed
274 275 276 277 278 279 280 281 282 283 284 285
	if app.Err() != nil {
		return nil, app.Err()
	}

	if err := app.Start(ctx); err != nil {
		return nil, err
	}

	// TODO: DI-ify bootstrap
	if !cfg.Online {
		return n, nil
	}
Jeromy's avatar
Jeromy committed
286

Łukasz Magiera's avatar
Łukasz Magiera committed
287
	return n, n.Bootstrap(DefaultBootstrapConfig)
Jeromy's avatar
Jeromy committed
288 289
}

290 291 292 293 294 295 296 297 298
func isTooManyFDError(err error) bool {
	perr, ok := err.(*os.PathError)
	if ok && perr.Err == syscall.EMFILE {
		return true
	}

	return false
}

299
func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
300
	// setup local identity
301 302
	if err := n.loadID(); err != nil {
		return err
Jeromy's avatar
Jeromy committed
303
	}
304

305 306 307 308 309
	// load the private key (if present)
	if err := n.loadPrivateKey(); err != nil {
		return err
	}

310 311 312 313 314 315 316
	rds := &retry.Datastore{
		Batching:    n.Repo.Datastore(),
		Delay:       time.Millisecond * 200,
		Retries:     6,
		TempErrFunc: isTooManyFDError,
	}

Jakub Sztandera's avatar
Jakub Sztandera committed
317
	// hash security
318
	bs := bstore.NewBlockstore(rds)
Łukasz Magiera's avatar
Łukasz Magiera committed
319
	bs = &verifbs.VerifBS{Blockstore: bs}
320

321
	opts := bstore.DefaultCacheOpts()
322 323 324 325 326
	conf, err := n.Repo.Config()
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
327 328 329
	// TEMP: setting global sharding switch here
	uio.UseHAMTSharding = conf.Experimental.ShardingEnabled

330
	opts.HasBloomFilterSize = conf.Datastore.BloomFilterSize
331
	if !cfg.Permanent {
332 333 334
		opts.HasBloomFilterSize = 0
	}

335 336 337 338 339
	if !cfg.NilRepo {
		bs, err = bstore.CachedBlockstore(ctx, bs, opts)
		if err != nil {
			return err
		}
340 341
	}

342
	bs = bstore.NewIdStore(bs)
343

344
	bs = cidv0v1.NewBlockstore(bs)
345

346
	n.BaseBlocks = bs
347
	n.GCLocker = bstore.NewGCLocker()
348
	n.Blockstore = bstore.NewGCBlockstore(bs, n.GCLocker)
349

350
	if conf.Experimental.FilestoreEnabled || conf.Experimental.UrlstoreEnabled {
351
		// hash security
352
		n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager())
353
		n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker)
Łukasz Magiera's avatar
Łukasz Magiera committed
354
		n.Blockstore = &verifbs.VerifBSGC{GCBlockstore: n.Blockstore}
355
	}
356

357 358 359 360 361 362
	rcfg, err := n.Repo.Config()
	if err != nil {
		return err
	}

	if rcfg.Datastore.HashOnRead {
363
		bs.HashOnRead(true)
364 365
	}

Steven Allen's avatar
Steven Allen committed
366 367 368 369 370 371
	hostOption := cfg.Host
	if cfg.DisableEncryptedConnections {
		innerHostOption := hostOption
		hostOption = func(ctx context.Context, id peer.ID, ps pstore.Peerstore, options ...libp2p.Option) (p2phost.Host, error) {
			return innerHostOption(ctx, id, ps, append(options, libp2p.NoSecurity)...)
		}
372
		// TODO: shouldn't this be Errorf to guarantee visibility?
Steven Allen's avatar
Steven Allen committed
373 374 375 376
		log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS.
		You will not be able to connect to any nodes configured to use encrypted connections`)
	}

377
	if cfg.Online {
378
		do := setupDiscoveryOption(rcfg.Discovery)
Steven Allen's avatar
Steven Allen committed
379
		if err := n.startOnlineServices(ctx, cfg.Routing, hostOption, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
380
			return err
381
		}
382 383
	} else {
		n.Exchange = offline.Exchange(n.Blockstore)
384 385
		n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.RecordValidator)
		n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), 0)
Jeromy's avatar
Jeromy committed
386
	}
387 388 389

	n.Blocks = bserv.New(n.Blockstore, n.Exchange)
	n.DAG = dag.NewDAGService(n.Blocks)
390 391 392

	internalDag := dag.NewDAGService(bserv.New(n.Blockstore, offline.Exchange(n.Blockstore)))
	n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG, internalDag)
393
	if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
394
		// TODO: we should move towards only running 'NewPinner' explicitly on
395 396 397
		// node init instead of implicitly here as a result of the pinner keys
		// not being found in the datastore.
		// this is kinda sketchy and could cause data loss
398
		n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG, internalDag)
399
	}
400
	n.Resolver = resolver.NewBasicResolver(n.DAG)
401

402
	// Provider
403
	queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore())
404 405 406 407 408
	if err != nil {
		return err
	}
	n.Provider = provider.NewProvider(ctx, queue, n.Routing)

Łukasz Magiera's avatar
Łukasz Magiera committed
409 410 411 412 413 414
	if cfg.Online {
		if err := n.startLateOnlineServices(ctx); err != nil {
			return err
		}
	}

415
	return n.loadFilesRoot()
Jeromy's avatar
Jeromy committed
416
}