repo.go 8.7 KB
Newer Older
1 2 3
package commands

import (
4
	"context"
keks's avatar
fixes  
keks committed
5
	"errors"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7 8
	"io"
	"os"
9
	"runtime"
10
	"strings"
11
	"sync"
12
	"text/tabwriter"
Jeromy's avatar
Jeromy committed
13

14
	humanize "github.com/dustin/go-humanize"
15
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
16
	corerepo "github.com/ipfs/go-ipfs/core/corerepo"
Michael Pfister's avatar
Michael Pfister committed
17
	fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
Jeromy's avatar
Jeromy committed
18

Jakub Sztandera's avatar
Jakub Sztandera committed
19 20 21
	cid "github.com/ipfs/go-cid"
	bstore "github.com/ipfs/go-ipfs-blockstore"
	cmds "github.com/ipfs/go-ipfs-cmds"
22 23
)

Michael Pfister's avatar
Michael Pfister committed
24 25 26 27
type RepoVersion struct {
	Version string
}

28
var RepoCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
29
	Helptext: cmds.HelpText{
30
		Tagline: "Manipulate the IPFS repo.",
31 32 33 34 35 36
		ShortDescription: `
'ipfs repo' is a plumbing command used to manipulate the repo.
`,
	},

	Subcommands: map[string]*cmds.Command{
37
		"stat":    repoStatCmd,
keks's avatar
keks committed
38
		"gc":      repoGcCmd,
39
		"fsck":    repoFsckCmd,
40
		"version": repoVersionCmd,
41
		"verify":  repoVerifyCmd,
42 43 44
	},
}

Kevin Atkinson's avatar
Kevin Atkinson committed
45
// GcResult is the result returned by "repo gc" command.
46
type GcResult struct {
47
	Key   cid.Cid
48 49 50
	Error string `json:",omitempty"`
}

Kejie Zhang's avatar
Kejie Zhang committed
51 52 53 54 55
const (
	repoStreamErrorsOptionName = "stream-errors"
	repoQuietOptionName        = "quiet"
)

keks's avatar
keks committed
56
var repoGcCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
57
	Helptext: cmds.HelpText{
rht's avatar
rht committed
58
		Tagline: "Perform a garbage collection sweep on the repo.",
59 60 61 62 63 64
		ShortDescription: `
'ipfs repo gc' is a plumbing command that will sweep the local
set of stored objects and remove ones that are not pinned in
order to reclaim hard disk space.
`,
	},
Steven Allen's avatar
Steven Allen committed
65 66 67
	Options: []cmds.Option{
		cmds.BoolOption(repoStreamErrorsOptionName, "Stream errors."),
		cmds.BoolOption(repoQuietOptionName, "q", "Write minimal output."),
68
	},
keks's avatar
keks committed
69 70
	Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := cmdenv.GetNode(env)
71
		if err != nil {
keks's avatar
keks committed
72
			return err
73 74
		}

Kejie Zhang's avatar
Kejie Zhang committed
75
		streamErrors, _ := req.Options[repoStreamErrorsOptionName].(bool)
keks's avatar
keks committed
76 77 78 79 80 81 82

		gcOutChan := corerepo.GarbageCollectAsync(n, req.Context)

		if streamErrors {
			errs := false
			for res := range gcOutChan {
				if res.Error != nil {
83 84 85
					if err := re.Emit(&GcResult{Error: res.Error.Error()}); err != nil {
						return err
					}
keks's avatar
keks committed
86 87
					errs = true
				} else {
88 89 90
					if err := re.Emit(&GcResult{Key: res.KeyRemoved}); err != nil {
						return err
					}
91
				}
Jeromy's avatar
Jeromy committed
92
			}
keks's avatar
keks committed
93 94
			if errs {
				return errors.New("encountered errors during gc run")
95
			}
keks's avatar
keks committed
96 97
		} else {
			err := corerepo.CollectResult(req.Context, gcOutChan, func(k cid.Cid) {
98 99 100 101
				// Nothing to do with this error, really. This
				// most likely means that the client is gone but
				// we still need to let the GC finish.
				_ = re.Emit(&GcResult{Key: k})
keks's avatar
keks committed
102
			})
103
			if err != nil {
keks's avatar
keks committed
104
				return err
105
			}
keks's avatar
keks committed
106 107 108 109 110 111
		}

		return nil
	},
	Type: GcResult{},
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
112
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, gcr *GcResult) error {
Kejie Zhang's avatar
Kejie Zhang committed
113
			quiet, _ := req.Options[repoQuietOptionName].(bool)
114

Overbool's avatar
Overbool committed
115 116
			if gcr.Error != "" {
				_, err := fmt.Fprintf(w, "Error: %s\n", gcr.Error)
keks's avatar
keks committed
117
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
118
			}
119

keks's avatar
keks committed
120 121 122
			prefix := "removed "
			if quiet {
				prefix = ""
123 124
			}

Overbool's avatar
Overbool committed
125
			_, err := fmt.Fprintf(w, "%s%s\n", prefix, gcr.Key)
keks's avatar
keks committed
126 127
			return err
		}),
128 129
	},
}
130

Kejie Zhang's avatar
Kejie Zhang committed
131 132 133 134 135
const (
	repoSizeOnlyOptionName = "size-only"
	repoHumanOptionName    = "human"
)

136
var repoStatCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
137
	Helptext: cmds.HelpText{
David Dias's avatar
David Dias committed
138 139
		Tagline: "Get stats for the currently used repo.",
		ShortDescription: `
140
'ipfs repo stat' provides information about the local set of
141 142 143 144
stored objects. It outputs:

RepoSize        int Size in bytes that the repo is currently taking.
StorageMax      string Maximum datastore size (from configuration)
145 146
NumObjects      int Number of objects in the local repo.
RepoPath        string The path to the repo being currently used.
Michael Pfister's avatar
Michael Pfister committed
147
Version         string The repo version.
David Dias's avatar
David Dias committed
148
`,
149
	},
Steven Allen's avatar
Steven Allen committed
150
	Options: []cmds.Option{
Jorropo's avatar
Jorropo committed
151 152
		cmds.BoolOption(repoSizeOnlyOptionName, "s", "Only report RepoSize and StorageMax."),
		cmds.BoolOption(repoHumanOptionName, "H", "Print sizes in human readable format (e.g., 1K 234M 2G)"),
153
	},
keks's avatar
keks committed
154
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
155
		n, err := cmdenv.GetNode(env)
156
		if err != nil {
keks's avatar
keks committed
157
			return err
158 159
		}

Kejie Zhang's avatar
Kejie Zhang committed
160
		sizeOnly, _ := req.Options[repoSizeOnlyOptionName].(bool)
161
		if sizeOnly {
162 163
			sizeStat, err := corerepo.RepoSize(req.Context, n)
			if err != nil {
keks's avatar
keks committed
164
				return err
165
			}
166
			return cmds.EmitOnce(res, &corerepo.Stat{
167 168
				SizeStat: sizeStat,
			})
169 170
		}

171
		stat, err := corerepo.RepoStat(req.Context, n)
172
		if err != nil {
keks's avatar
keks committed
173
			return err
174 175
		}

keks's avatar
keks committed
176
		return cmds.EmitOnce(res, &stat)
177
	},
178
	Type: &corerepo.Stat{},
Jan Winkelmann's avatar
Jan Winkelmann committed
179
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
180
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, stat *corerepo.Stat) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
181
			wtr := tabwriter.NewWriter(w, 0, 0, 1, ' ', 0)
182 183
			defer wtr.Flush()

Kejie Zhang's avatar
Kejie Zhang committed
184 185
			human, _ := req.Options[repoHumanOptionName].(bool)
			sizeOnly, _ := req.Options[repoSizeOnlyOptionName].(bool)
Jan Winkelmann's avatar
Jan Winkelmann committed
186

187
			printSize := func(name string, size uint64) {
188 189 190
				sizeStr := fmt.Sprintf("%d", size)
				if human {
					sizeStr = humanize.Bytes(size)
191
				}
192 193

				fmt.Fprintf(wtr, "%s:\t%s\n", name, sizeStr)
194
			}
195

196 197
			if !sizeOnly {
				fmt.Fprintf(wtr, "NumObjects:\t%d\n", stat.NumObjects)
198 199
			}

200 201
			printSize("RepoSize", stat.RepoSize)
			printSize("StorageMax", stat.StorageMax)
Thomas Gardner's avatar
Thomas Gardner committed
202

203 204 205 206
			if !sizeOnly {
				fmt.Fprintf(wtr, "RepoPath:\t%s\n", stat.RepoPath)
				fmt.Fprintf(wtr, "Version:\t%s\n", stat.Version)
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
207

208
			return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
209
		}),
210 211
	},
}
michael's avatar
michael committed
212

213
var repoFsckCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
214
	Helptext: cmds.HelpText{
215
		Tagline: "Remove repo lockfiles.",
michael's avatar
michael committed
216
		ShortDescription: `
217
'ipfs repo fsck' is now a no-op.
michael's avatar
michael committed
218 219
`,
	},
220
	NoRemote: true,
221
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
222
		return cmds.EmitOnce(res, &MessageOutput{"`ipfs repo fsck` is deprecated and does nothing.\n"})
michael's avatar
michael committed
223 224
	},
	Type: MessageOutput{},
225 226 227 228 229
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *MessageOutput) error {
			fmt.Fprintf(w, out.Message)
			return nil
		}),
Michael Pfister's avatar
Michael Pfister committed
230 231 232
	},
}

Jeromy's avatar
Jeromy committed
233
type VerifyProgress struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
234
	Msg      string
Jeromy's avatar
Jeromy committed
235 236 237
	Progress int
}

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) {
	defer wg.Done()

	for k := range keys {
		_, err := bs.Get(k)
		if err != nil {
			select {
			case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
			case <-ctx.Done():
				return
			}

			continue
		}

		select {
		case results <- "":
		case <-ctx.Done():
			return
		}
	}
}

func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string {
	results := make(chan string)

	go func() {
		defer close(results)

		var wg sync.WaitGroup

		for i := 0; i < runtime.NumCPU()*2; i++ {
			wg.Add(1)
			go verifyWorkerRun(ctx, &wg, keys, results, bs)
		}

		wg.Wait()
	}()

	return results
}

280
var repoVerifyCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
281
	Helptext: cmds.HelpText{
Jeromy's avatar
Jeromy committed
282 283
		Tagline: "Verify all blocks in repo are not corrupted.",
	},
284 285
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		nd, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
286
		if err != nil {
287
			return err
Jeromy's avatar
Jeromy committed
288 289
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
290 291
		bs := bstore.NewBlockstore(nd.Repo.Datastore())
		bs.HashOnRead(true)
Jeromy's avatar
Jeromy committed
292

293
		keys, err := bs.AllKeysChan(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
294 295
		if err != nil {
			log.Error(err)
296
			return err
Jan Winkelmann's avatar
Jan Winkelmann committed
297 298
		}

299
		results := verifyResultChan(req.Context, keys, bs)
300

Jan Winkelmann's avatar
Jan Winkelmann committed
301 302
		var fails int
		var i int
303 304
		for msg := range results {
			if msg != "" {
305 306
				if err := res.Emit(&VerifyProgress{Msg: msg}); err != nil {
					return err
Jan Winkelmann's avatar
Jan Winkelmann committed
307 308
				}
				fails++
Jeromy's avatar
Jeromy committed
309
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
310
			i++
311 312
			if err := res.Emit(&VerifyProgress{Progress: i}); err != nil {
				return err
313
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
314
		}
Jeromy's avatar
Jeromy committed
315

316 317
		if fails != 0 {
			return errors.New("verify complete, some blocks were corrupt")
Jan Winkelmann's avatar
Jan Winkelmann committed
318
		}
319 320

		return res.Emit(&VerifyProgress{Msg: "verify complete, all blocks validated."})
Jan Winkelmann's avatar
Jan Winkelmann committed
321 322
	},
	Type: &VerifyProgress{},
323 324
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *VerifyProgress) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
325 326
			if strings.Contains(obj.Msg, "was corrupt") {
				fmt.Fprintln(os.Stdout, obj.Msg)
327
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
328
			}
Jeromy's avatar
Jeromy committed
329

Jan Winkelmann's avatar
Jan Winkelmann committed
330 331 332
			if obj.Msg != "" {
				if len(obj.Msg) < 20 {
					obj.Msg += "             "
Jeromy's avatar
Jeromy committed
333
				}
334 335
				fmt.Fprintln(w, obj.Msg)
				return nil
Jeromy's avatar
Jeromy committed
336 337
			}

338 339 340
			fmt.Fprintf(w, "%d blocks processed.\r", obj.Progress)
			return nil
		}),
Jeromy's avatar
Jeromy committed
341 342 343
	},
}

344
var repoVersionCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
345
	Helptext: cmds.HelpText{
Michael Pfister's avatar
Michael Pfister committed
346 347 348 349 350 351
		Tagline: "Show the repo version.",
		ShortDescription: `
'ipfs repo version' returns the current repo version.
`,
	},

Steven Allen's avatar
Steven Allen committed
352 353
	Options: []cmds.Option{
		cmds.BoolOption(repoQuietOptionName, "q", "Write minimal output."),
Michael Pfister's avatar
Michael Pfister committed
354
	},
355 356
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		return cmds.EmitOnce(res, &RepoVersion{
357
			Version: fmt.Sprint(fsrepo.RepoVersion),
Michael Pfister's avatar
Michael Pfister committed
358 359 360
		})
	},
	Type: RepoVersion{},
361 362 363
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RepoVersion) error {
			quiet, _ := req.Options[repoQuietOptionName].(bool)
Michael Pfister's avatar
Michael Pfister committed
364 365

			if quiet {
366
				fmt.Fprintf(w, "fs-repo@%s\n", out.Version)
Michael Pfister's avatar
Michael Pfister committed
367
			} else {
368
				fmt.Fprintf(w, "ipfs repo version fs-repo@%s\n", out.Version)
Michael Pfister's avatar
Michael Pfister committed
369
			}
370 371
			return nil
		}),
michael's avatar
michael committed
372 373
	},
}