core.go 4.15 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1 2 3 4 5
package node

import (
	"context"
	"fmt"
Adin Schmahmann's avatar
Adin Schmahmann committed
6
	"time"
Łukasz Magiera's avatar
Łukasz Magiera committed
7

tavit ohanian's avatar
tavit ohanian committed
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
	"gitlab.dms3.io/dms3/go-bitswap"
	"gitlab.dms3.io/dms3/go-bitswap/network"
	"gitlab.dms3.io/dms3/go-blockservice"
	"gitlab.dms3.io/dms3/go-cid"
	"gitlab.dms3.io/dms3/go-datastore"
	blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
	exchange "gitlab.dms3.io/dms3/go-dms3-exchange-interface"
	pin "gitlab.dms3.io/dms3/go-dms3-pinner"
	"gitlab.dms3.io/dms3/go-dms3-pinner/dspinner"
	"gitlab.dms3.io/dms3/go-filestore"
	format "gitlab.dms3.io/dms3/go-ld-format"
	"gitlab.dms3.io/dms3/go-merkledag"
	"gitlab.dms3.io/dms3/go-mfs"
	"gitlab.dms3.io/dms3/go-unixfs"
	"gitlab.dms3.io/p2p/go-p2p-core/host"
	"gitlab.dms3.io/p2p/go-p2p-core/routing"
Łukasz Magiera's avatar
Łukasz Magiera committed
24
	"go.uber.org/fx"
25

tavit ohanian's avatar
tavit ohanian committed
26 27
	"gitlab.dms3.io/dms3/go-dms3/core/node/helpers"
	"gitlab.dms3.io/dms3/go-dms3/repo"
Łukasz Magiera's avatar
Łukasz Magiera committed
28 29
)

30 31
// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
Łukasz Magiera's avatar
Łukasz Magiera committed
32 33 34 35 36 37 38 39 40 41 42
	bsvc := blockservice.New(bs, rem)

	lc.Append(fx.Hook{
		OnStop: func(ctx context.Context) error {
			return bsvc.Close()
		},
	})

	return bsvc
}

43
// Pinning creates new pinner which tells GC which blocks should be kept
Łukasz Magiera's avatar
Łukasz Magiera committed
44
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
Adin Schmahmann's avatar
Adin Schmahmann committed
45 46 47 48 49 50 51 52 53 54
	rootDS := repo.Datastore()

	syncFn := func() error {
		if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
			return err
		}
		return rootDS.Sync(filestore.FilestorePrefix)
	}
	syncDs := &syncDagService{ds, syncFn}

Adin Schmahmann's avatar
Adin Schmahmann committed
55 56 57 58
	ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Minute)
	defer cancel()

	pinning, err := dspinner.New(ctx, rootDS, syncDs)
Łukasz Magiera's avatar
Łukasz Magiera committed
59
	if err != nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
60
		return nil, err
Łukasz Magiera's avatar
Łukasz Magiera committed
61 62 63 64 65
	}

	return pinning, nil
}

66 67 68 69 70
var (
	_ merkledag.SessionMaker = new(syncDagService)
	_ format.DAGService      = new(syncDagService)
)

Adin Schmahmann's avatar
Adin Schmahmann committed
71 72 73 74 75 76 77 78 79 80
// syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore
type syncDagService struct {
	format.DAGService
	syncFn func() error
}

func (s *syncDagService) Sync() error {
	return s.syncFn()
}

81 82 83 84
func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
	return merkledag.NewSession(ctx, s.DAGService)
}

85 86
// Dag creates new DAGService
func Dag(bs blockservice.BlockService) format.DAGService {
Łukasz Magiera's avatar
Łukasz Magiera committed
87 88 89
	return merkledag.NewDAGService(bs)
}

tavit ohanian's avatar
tavit ohanian committed
90
// OnlineExchange creates new P2p backed block exchange (BitSwap)
Michael Avila's avatar
Michael Avila committed
91
func OnlineExchange(provide bool) interface{} {
Raúl Kripalani's avatar
Raúl Kripalani committed
92
	return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface {
tavit ohanian's avatar
tavit ohanian committed
93
		bitswapNetwork := network.NewFromDms3Host(host, rt)
94
		exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs, bitswap.ProvideEnabled(provide))
Michael Avila's avatar
Michael Avila committed
95 96 97 98 99 100 101 102
		lc.Append(fx.Hook{
			OnStop: func(ctx context.Context) error {
				return exch.Close()
			},
		})
		return exch

	}
Łukasz Magiera's avatar
Łukasz Magiera committed
103 104
}

105
// Files loads persisted MFS root
106
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
107 108
	dsk := datastore.NewKey("/local/filesroot")
	pf := func(ctx context.Context, c cid.Cid) error {
Adin Schmahmann's avatar
Adin Schmahmann committed
109 110 111 112 113 114 115 116 117 118 119 120
		rootDS := repo.Datastore()
		if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
			return err
		}
		if err := rootDS.Sync(filestore.FilestorePrefix); err != nil {
			return err
		}

		if err := rootDS.Put(dsk, c.Bytes()); err != nil {
			return err
		}
		return rootDS.Sync(dsk)
Łukasz Magiera's avatar
Łukasz Magiera committed
121 122 123 124
	}

	var nd *merkledag.ProtoNode
	val, err := repo.Datastore().Get(dsk)
125
	ctx := helpers.LifecycleCtx(mctx, lc)
Łukasz Magiera's avatar
Łukasz Magiera committed
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164

	switch {
	case err == datastore.ErrNotFound || val == nil:
		nd = unixfs.EmptyDirNode()
		err := dag.Add(ctx, nd)
		if err != nil {
			return nil, fmt.Errorf("failure writing to dagstore: %s", err)
		}
	case err == nil:
		c, err := cid.Cast(val)
		if err != nil {
			return nil, err
		}

		rnd, err := dag.Get(ctx, c)
		if err != nil {
			return nil, fmt.Errorf("error loading filesroot from DAG: %s", err)
		}

		pbnd, ok := rnd.(*merkledag.ProtoNode)
		if !ok {
			return nil, merkledag.ErrNotProtobuf
		}

		nd = pbnd
	default:
		return nil, err
	}

	root, err := mfs.NewRoot(ctx, dag, nd, pf)

	lc.Append(fx.Hook{
		OnStop: func(ctx context.Context) error {
			return root.Close()
		},
	})

	return root, err
}