package node import ( "context" "fmt" "time" "gitlab.dms3.io/dms3/go-bitswap" "gitlab.dms3.io/dms3/go-bitswap/network" "gitlab.dms3.io/dms3/go-blockservice" "gitlab.dms3.io/dms3/go-cid" "gitlab.dms3.io/dms3/go-datastore" blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore" exchange "gitlab.dms3.io/dms3/go-dms3-exchange-interface" pin "gitlab.dms3.io/dms3/go-dms3-pinner" "gitlab.dms3.io/dms3/go-dms3-pinner/dspinner" "gitlab.dms3.io/dms3/go-filestore" format "gitlab.dms3.io/dms3/go-ld-format" "gitlab.dms3.io/dms3/go-merkledag" "gitlab.dms3.io/dms3/go-mfs" "gitlab.dms3.io/dms3/go-unixfs" "gitlab.dms3.io/p2p/go-p2p-core/host" "gitlab.dms3.io/p2p/go-p2p-core/routing" "go.uber.org/fx" "gitlab.dms3.io/dms3/go-dms3/core/node/helpers" "gitlab.dms3.io/dms3/go-dms3/repo" ) // BlockService creates new blockservice which provides an interface to fetch content-addressable blocks func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { bsvc := blockservice.New(bs, rem) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return bsvc.Close() }, }) return bsvc } // Pinning creates new pinner which tells GC which blocks should be kept func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { rootDS := repo.Datastore() syncFn := func() error { if err := rootDS.Sync(blockstore.BlockPrefix); err != nil { return err } return rootDS.Sync(filestore.FilestorePrefix) } syncDs := &syncDagService{ds, syncFn} ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Minute) defer cancel() pinning, err := dspinner.New(ctx, rootDS, syncDs) if err != nil { return nil, err } return pinning, nil } var ( _ merkledag.SessionMaker = new(syncDagService) _ format.DAGService = new(syncDagService) ) // syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore type syncDagService struct { format.DAGService syncFn func() error } func (s *syncDagService) Sync() error { return s.syncFn() } func (s *syncDagService) Session(ctx context.Context) format.NodeGetter { return merkledag.NewSession(ctx, s.DAGService) } // Dag creates new DAGService func Dag(bs blockservice.BlockService) format.DAGService { return merkledag.NewDAGService(bs) } // OnlineExchange creates new P2p backed block exchange (BitSwap) func OnlineExchange(provide bool) interface{} { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface { bitswapNetwork := network.NewFromDms3Host(host, rt) exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs, bitswap.ProvideEnabled(provide)) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return exch.Close() }, }) return exch } } // Files loads persisted MFS root func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) { dsk := datastore.NewKey("/local/filesroot") pf := func(ctx context.Context, c cid.Cid) error { rootDS := repo.Datastore() if err := rootDS.Sync(blockstore.BlockPrefix); err != nil { return err } if err := rootDS.Sync(filestore.FilestorePrefix); err != nil { return err } if err := rootDS.Put(dsk, c.Bytes()); err != nil { return err } return rootDS.Sync(dsk) } var nd *merkledag.ProtoNode val, err := repo.Datastore().Get(dsk) ctx := helpers.LifecycleCtx(mctx, lc) switch { case err == datastore.ErrNotFound || val == nil: nd = unixfs.EmptyDirNode() err := dag.Add(ctx, nd) if err != nil { return nil, fmt.Errorf("failure writing to dagstore: %s", err) } case err == nil: c, err := cid.Cast(val) if err != nil { return nil, err } rnd, err := dag.Get(ctx, c) if err != nil { return nil, fmt.Errorf("error loading filesroot from DAG: %s", err) } pbnd, ok := rnd.(*merkledag.ProtoNode) if !ok { return nil, merkledag.ErrNotProtobuf } nd = pbnd default: return nil, err } root, err := mfs.NewRoot(ctx, dag, nd, pf) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return root.Close() }, }) return root, err }