repo.go 9.59 KB
Newer Older
1 2 3
package commands

import (
4
	"context"
keks's avatar
fixes  
keks committed
5
	"errors"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7 8 9
	"io"
	"os"
	"path/filepath"
10
	"runtime"
11
	"strings"
12
	"sync"
13
	"text/tabwriter"
Jeromy's avatar
Jeromy committed
14

15
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
16
	corerepo "github.com/ipfs/go-ipfs/core/corerepo"
Michael Pfister's avatar
Michael Pfister committed
17
	fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
Jeromy's avatar
Jeromy committed
18

Jakub Sztandera's avatar
Jakub Sztandera committed
19 20 21 22 23
	cid "github.com/ipfs/go-cid"
	bstore "github.com/ipfs/go-ipfs-blockstore"
	cmdkit "github.com/ipfs/go-ipfs-cmdkit"
	cmds "github.com/ipfs/go-ipfs-cmds"
	config "github.com/ipfs/go-ipfs-config"
24 25
)

Michael Pfister's avatar
Michael Pfister committed
26 27 28 29
type RepoVersion struct {
	Version string
}

30
var RepoCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
31
	Helptext: cmdkit.HelpText{
32
		Tagline: "Manipulate the IPFS repo.",
33 34 35 36 37 38
		ShortDescription: `
'ipfs repo' is a plumbing command used to manipulate the repo.
`,
	},

	Subcommands: map[string]*cmds.Command{
39
		"stat":    repoStatCmd,
keks's avatar
keks committed
40
		"gc":      repoGcCmd,
41
		"fsck":    repoFsckCmd,
42
		"version": repoVersionCmd,
43
		"verify":  repoVerifyCmd,
44 45 46
	},
}

Kevin Atkinson's avatar
Kevin Atkinson committed
47
// GcResult is the result returned by "repo gc" command.
48
type GcResult struct {
49
	Key   cid.Cid
50 51 52
	Error string `json:",omitempty"`
}

Kejie Zhang's avatar
Kejie Zhang committed
53 54 55 56 57
const (
	repoStreamErrorsOptionName = "stream-errors"
	repoQuietOptionName        = "quiet"
)

keks's avatar
keks committed
58
var repoGcCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
59
	Helptext: cmdkit.HelpText{
rht's avatar
rht committed
60
		Tagline: "Perform a garbage collection sweep on the repo.",
61 62 63 64 65 66
		ShortDescription: `
'ipfs repo gc' is a plumbing command that will sweep the local
set of stored objects and remove ones that are not pinned in
order to reclaim hard disk space.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
67
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
68 69
		cmdkit.BoolOption(repoStreamErrorsOptionName, "Stream errors."),
		cmdkit.BoolOption(repoQuietOptionName, "q", "Write minimal output."),
70
	},
keks's avatar
keks committed
71 72
	Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := cmdenv.GetNode(env)
73
		if err != nil {
keks's avatar
keks committed
74
			return err
75 76
		}

Kejie Zhang's avatar
Kejie Zhang committed
77
		streamErrors, _ := req.Options[repoStreamErrorsOptionName].(bool)
keks's avatar
keks committed
78 79 80 81 82 83 84

		gcOutChan := corerepo.GarbageCollectAsync(n, req.Context)

		if streamErrors {
			errs := false
			for res := range gcOutChan {
				if res.Error != nil {
85 86 87
					if err := re.Emit(&GcResult{Error: res.Error.Error()}); err != nil {
						return err
					}
keks's avatar
keks committed
88 89
					errs = true
				} else {
90 91 92
					if err := re.Emit(&GcResult{Key: res.KeyRemoved}); err != nil {
						return err
					}
93
				}
Jeromy's avatar
Jeromy committed
94
			}
keks's avatar
keks committed
95 96
			if errs {
				return errors.New("encountered errors during gc run")
97
			}
keks's avatar
keks committed
98 99 100 101
		} else {
			err := corerepo.CollectResult(req.Context, gcOutChan, func(k cid.Cid) {
				re.Emit(&GcResult{Key: k})
			})
102
			if err != nil {
keks's avatar
keks committed
103
				return err
104
			}
keks's avatar
keks committed
105 106 107 108 109 110
		}

		return nil
	},
	Type: GcResult{},
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
111
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, gcr *GcResult) error {
Kejie Zhang's avatar
Kejie Zhang committed
112
			quiet, _ := req.Options[repoQuietOptionName].(bool)
113

Overbool's avatar
Overbool committed
114 115
			if gcr.Error != "" {
				_, err := fmt.Fprintf(w, "Error: %s\n", gcr.Error)
keks's avatar
keks committed
116
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
117
			}
118

keks's avatar
keks committed
119 120 121
			prefix := "removed "
			if quiet {
				prefix = ""
122 123
			}

Overbool's avatar
Overbool committed
124
			_, err := fmt.Fprintf(w, "%s%s\n", prefix, gcr.Key)
keks's avatar
keks committed
125 126
			return err
		}),
127 128
	},
}
129

Kejie Zhang's avatar
Kejie Zhang committed
130 131 132 133 134
const (
	repoSizeOnlyOptionName = "size-only"
	repoHumanOptionName    = "human"
)

135
var repoStatCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
136
	Helptext: cmdkit.HelpText{
David Dias's avatar
David Dias committed
137 138
		Tagline: "Get stats for the currently used repo.",
		ShortDescription: `
139
'ipfs repo stat' provides information about the local set of
140 141 142 143
stored objects. It outputs:

RepoSize        int Size in bytes that the repo is currently taking.
StorageMax      string Maximum datastore size (from configuration)
144 145
NumObjects      int Number of objects in the local repo.
RepoPath        string The path to the repo being currently used.
Michael Pfister's avatar
Michael Pfister committed
146
Version         string The repo version.
David Dias's avatar
David Dias committed
147
`,
148
	},
149
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
150 151
		cmdkit.BoolOption(repoSizeOnlyOptionName, "Only report RepoSize and StorageMax."),
		cmdkit.BoolOption(repoHumanOptionName, "Output sizes in MiB."),
152
	},
keks's avatar
keks committed
153
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
154
		n, err := cmdenv.GetNode(env)
155
		if err != nil {
keks's avatar
keks committed
156
			return err
157 158
		}

Kejie Zhang's avatar
Kejie Zhang committed
159
		sizeOnly, _ := req.Options[repoSizeOnlyOptionName].(bool)
160
		if sizeOnly {
161 162
			sizeStat, err := corerepo.RepoSize(req.Context, n)
			if err != nil {
keks's avatar
keks committed
163
				return err
164 165 166 167
			}
			cmds.EmitOnce(res, &corerepo.Stat{
				SizeStat: sizeStat,
			})
keks's avatar
keks committed
168
			return nil
169 170
		}

171
		stat, err := corerepo.RepoStat(req.Context, n)
172
		if err != nil {
keks's avatar
keks committed
173
			return err
174 175
		}

keks's avatar
keks committed
176
		return cmds.EmitOnce(res, &stat)
177
	},
178
	Type: &corerepo.Stat{},
Jan Winkelmann's avatar
Jan Winkelmann committed
179
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
180
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, stat *corerepo.Stat) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
181
			wtr := tabwriter.NewWriter(w, 0, 0, 1, ' ', 0)
182 183
			defer wtr.Flush()

Kejie Zhang's avatar
Kejie Zhang committed
184 185
			human, _ := req.Options[repoHumanOptionName].(bool)
			sizeOnly, _ := req.Options[repoSizeOnlyOptionName].(bool)
Jan Winkelmann's avatar
Jan Winkelmann committed
186

187 188 189 190
			printSize := func(name string, size uint64) {
				sizeInMiB := size / (1024 * 1024)
				if human && sizeInMiB > 0 {
					fmt.Fprintf(wtr, "%s (MiB):\t%d\n", name, sizeInMiB)
191
				} else {
192
					fmt.Fprintf(wtr, "%s:\t%d\n", name, size)
193
				}
194
			}
195

196 197
			if !sizeOnly {
				fmt.Fprintf(wtr, "NumObjects:\t%d\n", stat.NumObjects)
198 199
			}

200 201
			printSize("RepoSize", stat.RepoSize)
			printSize("StorageMax", stat.StorageMax)
Thomas Gardner's avatar
Thomas Gardner committed
202

203 204 205 206
			if !sizeOnly {
				fmt.Fprintf(wtr, "RepoPath:\t%s\n", stat.RepoPath)
				fmt.Fprintf(wtr, "Version:\t%s\n", stat.Version)
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
207

208
			return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
209
		}),
210 211
	},
}
michael's avatar
michael committed
212

213
var repoFsckCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
214
	Helptext: cmdkit.HelpText{
215
		Tagline: "Remove repo lockfiles.",
michael's avatar
michael committed
216 217 218 219 220 221
		ShortDescription: `
'ipfs repo fsck' is a plumbing command that will remove repo and level db
lockfiles, as well as the api file. This command can only run when no ipfs
daemons are running.
`,
	},
222 223 224 225 226
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		configRoot, err := cmdenv.GetConfigRoot(env)
		if err != nil {
			return err
		}
michael's avatar
michael committed
227 228 229

		dsPath, err := config.DataStorePath(configRoot)
		if err != nil {
230
			return err
michael's avatar
michael committed
231 232 233
		}

		dsLockFile := filepath.Join(dsPath, "LOCK") // TODO: get this lockfile programmatically
234
		repoLockFile := filepath.Join(configRoot, fsrepo.LockFile)
michael's avatar
michael committed
235 236 237 238 239 240 241 242
		apiFile := filepath.Join(configRoot, "api") // TODO: get this programmatically

		log.Infof("Removing repo lockfile: %s", repoLockFile)
		log.Infof("Removing datastore lockfile: %s", dsLockFile)
		log.Infof("Removing api file: %s", apiFile)

		err = os.Remove(repoLockFile)
		if err != nil && !os.IsNotExist(err) {
243
			return err
michael's avatar
michael committed
244 245 246
		}
		err = os.Remove(dsLockFile)
		if err != nil && !os.IsNotExist(err) {
247
			return err
michael's avatar
michael committed
248 249 250
		}
		err = os.Remove(apiFile)
		if err != nil && !os.IsNotExist(err) {
251
			return err
michael's avatar
michael committed
252 253
		}

254
		return cmds.EmitOnce(res, &MessageOutput{"Lockfiles have been removed.\n"})
michael's avatar
michael committed
255 256
	},
	Type: MessageOutput{},
257 258 259 260 261
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *MessageOutput) error {
			fmt.Fprintf(w, out.Message)
			return nil
		}),
Michael Pfister's avatar
Michael Pfister committed
262 263 264
	},
}

Jeromy's avatar
Jeromy committed
265
type VerifyProgress struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
266
	Msg      string
Jeromy's avatar
Jeromy committed
267 268 269
	Progress int
}

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) {
	defer wg.Done()

	for k := range keys {
		_, err := bs.Get(k)
		if err != nil {
			select {
			case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
			case <-ctx.Done():
				return
			}

			continue
		}

		select {
		case results <- "":
		case <-ctx.Done():
			return
		}
	}
}

func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string {
	results := make(chan string)

	go func() {
		defer close(results)

		var wg sync.WaitGroup

		for i := 0; i < runtime.NumCPU()*2; i++ {
			wg.Add(1)
			go verifyWorkerRun(ctx, &wg, keys, results, bs)
		}

		wg.Wait()
	}()

	return results
}

312
var repoVerifyCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
313
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
314 315
		Tagline: "Verify all blocks in repo are not corrupted.",
	},
316 317
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		nd, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
318
		if err != nil {
319
			return err
Jeromy's avatar
Jeromy committed
320 321
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
322 323
		bs := bstore.NewBlockstore(nd.Repo.Datastore())
		bs.HashOnRead(true)
Jeromy's avatar
Jeromy committed
324

325
		keys, err := bs.AllKeysChan(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
326 327
		if err != nil {
			log.Error(err)
328
			return err
Jan Winkelmann's avatar
Jan Winkelmann committed
329 330
		}

331
		results := verifyResultChan(req.Context, keys, bs)
332

Jan Winkelmann's avatar
Jan Winkelmann committed
333 334
		var fails int
		var i int
335 336
		for msg := range results {
			if msg != "" {
337 338
				if err := res.Emit(&VerifyProgress{Msg: msg}); err != nil {
					return err
Jan Winkelmann's avatar
Jan Winkelmann committed
339 340
				}
				fails++
Jeromy's avatar
Jeromy committed
341
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
342
			i++
343 344
			if err := res.Emit(&VerifyProgress{Progress: i}); err != nil {
				return err
345
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
346
		}
Jeromy's avatar
Jeromy committed
347

348 349
		if fails != 0 {
			return errors.New("verify complete, some blocks were corrupt")
Jan Winkelmann's avatar
Jan Winkelmann committed
350
		}
351 352

		return res.Emit(&VerifyProgress{Msg: "verify complete, all blocks validated."})
Jan Winkelmann's avatar
Jan Winkelmann committed
353 354
	},
	Type: &VerifyProgress{},
355 356
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *VerifyProgress) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
357 358
			if strings.Contains(obj.Msg, "was corrupt") {
				fmt.Fprintln(os.Stdout, obj.Msg)
359
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
360
			}
Jeromy's avatar
Jeromy committed
361

Jan Winkelmann's avatar
Jan Winkelmann committed
362 363 364
			if obj.Msg != "" {
				if len(obj.Msg) < 20 {
					obj.Msg += "             "
Jeromy's avatar
Jeromy committed
365
				}
366 367
				fmt.Fprintln(w, obj.Msg)
				return nil
Jeromy's avatar
Jeromy committed
368 369
			}

370 371 372
			fmt.Fprintf(w, "%d blocks processed.\r", obj.Progress)
			return nil
		}),
Jeromy's avatar
Jeromy committed
373 374 375
	},
}

376
var repoVersionCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
377
	Helptext: cmdkit.HelpText{
Michael Pfister's avatar
Michael Pfister committed
378 379 380 381 382 383
		Tagline: "Show the repo version.",
		ShortDescription: `
'ipfs repo version' returns the current repo version.
`,
	},

Jan Winkelmann's avatar
Jan Winkelmann committed
384
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
385
		cmdkit.BoolOption(repoQuietOptionName, "q", "Write minimal output."),
Michael Pfister's avatar
Michael Pfister committed
386
	},
387 388
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		return cmds.EmitOnce(res, &RepoVersion{
389
			Version: fmt.Sprint(fsrepo.RepoVersion),
Michael Pfister's avatar
Michael Pfister committed
390 391 392
		})
	},
	Type: RepoVersion{},
393 394 395
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RepoVersion) error {
			quiet, _ := req.Options[repoQuietOptionName].(bool)
Michael Pfister's avatar
Michael Pfister committed
396 397

			if quiet {
398
				fmt.Fprintf(w, fmt.Sprintf("fs-repo@%s\n", out.Version))
Michael Pfister's avatar
Michael Pfister committed
399
			} else {
400
				fmt.Fprintf(w, fmt.Sprintf("ipfs repo version fs-repo@%s\n", out.Version))
Michael Pfister's avatar
Michael Pfister committed
401
			}
402 403
			return nil
		}),
michael's avatar
michael committed
404 405
	},
}