repo.go 10.5 KB
Newer Older
1 2 3 4
package commands

import (
	"bytes"
5
	"context"
keks's avatar
fixes  
keks committed
6
	"errors"
7
	"fmt"
Jeromy's avatar
Jeromy committed
8 9 10
	"io"
	"os"
	"path/filepath"
11
	"runtime"
12
	"strings"
13
	"sync"
14
	"text/tabwriter"
Jeromy's avatar
Jeromy committed
15

Jan Winkelmann's avatar
Jan Winkelmann committed
16
	oldcmds "github.com/ipfs/go-ipfs/commands"
17
	lgc "github.com/ipfs/go-ipfs/commands/legacy"
18
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
Jan Winkelmann's avatar
Jan Winkelmann committed
19
	e "github.com/ipfs/go-ipfs/core/commands/e"
20
	corerepo "github.com/ipfs/go-ipfs/core/corerepo"
Michael Pfister's avatar
Michael Pfister committed
21
	fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
Jeromy's avatar
Jeromy committed
22

Steven Allen's avatar
Steven Allen committed
23
	config "gx/ipfs/QmPEpj17FDRpc7K1aArKZp3RsHtzRMKykeK9GVgn4WQGPR/go-ipfs-config"
24
	cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
25
	cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
Steven Allen's avatar
Steven Allen committed
26
	bstore "gx/ipfs/QmcDDgAXDbpDUpadCJKLr49KYR4HuL7T8Z1dZTHt6ixsoR/go-ipfs-blockstore"
27
	cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
28 29
)

Michael Pfister's avatar
Michael Pfister committed
30 31 32 33
type RepoVersion struct {
	Version string
}

34
var RepoCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
35
	Helptext: cmdkit.HelpText{
36
		Tagline: "Manipulate the IPFS repo.",
37 38 39 40 41 42
		ShortDescription: `
'ipfs repo' is a plumbing command used to manipulate the repo.
`,
	},

	Subcommands: map[string]*cmds.Command{
43
		"stat":    repoStatCmd,
keks's avatar
keks committed
44
		"gc":      repoGcCmd,
45 46
		"fsck":    RepoFsckCmd,
		"version": repoVersionCmd,
47
		"verify":  lgc.NewCommand(repoVerifyCmd),
48 49 50
	},
}

Kevin Atkinson's avatar
Kevin Atkinson committed
51
// GcResult is the result returned by "repo gc" command.
52
type GcResult struct {
53
	Key   cid.Cid
54 55 56
	Error string `json:",omitempty"`
}

Kejie Zhang's avatar
Kejie Zhang committed
57 58 59 60 61
const (
	repoStreamErrorsOptionName = "stream-errors"
	repoQuietOptionName        = "quiet"
)

keks's avatar
keks committed
62
var repoGcCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
63
	Helptext: cmdkit.HelpText{
rht's avatar
rht committed
64
		Tagline: "Perform a garbage collection sweep on the repo.",
65 66 67 68 69 70
		ShortDescription: `
'ipfs repo gc' is a plumbing command that will sweep the local
set of stored objects and remove ones that are not pinned in
order to reclaim hard disk space.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
71
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
72 73
		cmdkit.BoolOption(repoStreamErrorsOptionName, "Stream errors."),
		cmdkit.BoolOption(repoQuietOptionName, "q", "Write minimal output."),
74
	},
keks's avatar
keks committed
75 76
	Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := cmdenv.GetNode(env)
77
		if err != nil {
keks's avatar
keks committed
78
			return err
79 80
		}

Kejie Zhang's avatar
Kejie Zhang committed
81
		streamErrors, _ := req.Options[repoStreamErrorsOptionName].(bool)
keks's avatar
keks committed
82 83 84 85 86 87 88

		gcOutChan := corerepo.GarbageCollectAsync(n, req.Context)

		if streamErrors {
			errs := false
			for res := range gcOutChan {
				if res.Error != nil {
89 90 91
					if err := re.Emit(&GcResult{Error: res.Error.Error()}); err != nil {
						return err
					}
keks's avatar
keks committed
92 93
					errs = true
				} else {
94 95 96
					if err := re.Emit(&GcResult{Key: res.KeyRemoved}); err != nil {
						return err
					}
97
				}
Jeromy's avatar
Jeromy committed
98
			}
keks's avatar
keks committed
99 100
			if errs {
				return errors.New("encountered errors during gc run")
101
			}
keks's avatar
keks committed
102 103 104 105
		} else {
			err := corerepo.CollectResult(req.Context, gcOutChan, func(k cid.Cid) {
				re.Emit(&GcResult{Key: k})
			})
106
			if err != nil {
keks's avatar
keks committed
107
				return err
108
			}
keks's avatar
keks committed
109 110 111 112 113 114 115
		}

		return nil
	},
	Type: GcResult{},
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Kejie Zhang's avatar
Kejie Zhang committed
116
			quiet, _ := req.Options[repoQuietOptionName].(bool)
117

Jan Winkelmann's avatar
Jan Winkelmann committed
118 119
			obj, ok := v.(*GcResult)
			if !ok {
keks's avatar
keks committed
120
				return e.TypeErr(obj, v)
Jan Winkelmann's avatar
Jan Winkelmann committed
121
			}
122

Jan Winkelmann's avatar
Jan Winkelmann committed
123
			if obj.Error != "" {
keks's avatar
keks committed
124 125
				_, err := fmt.Fprintf(w, "Error: %s\n", obj.Error)
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
126
			}
127

keks's avatar
keks committed
128 129 130
			prefix := "removed "
			if quiet {
				prefix = ""
131 132
			}

keks's avatar
keks committed
133 134 135
			_, err := fmt.Fprintf(w, "%s%s\n", prefix, obj.Key)
			return err
		}),
136 137
	},
}
138

Kejie Zhang's avatar
Kejie Zhang committed
139 140 141 142 143
const (
	repoSizeOnlyOptionName = "size-only"
	repoHumanOptionName    = "human"
)

144
var repoStatCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
145
	Helptext: cmdkit.HelpText{
David Dias's avatar
David Dias committed
146 147
		Tagline: "Get stats for the currently used repo.",
		ShortDescription: `
148
'ipfs repo stat' provides information about the local set of
149 150 151 152
stored objects. It outputs:

RepoSize        int Size in bytes that the repo is currently taking.
StorageMax      string Maximum datastore size (from configuration)
153 154
NumObjects      int Number of objects in the local repo.
RepoPath        string The path to the repo being currently used.
Michael Pfister's avatar
Michael Pfister committed
155
Version         string The repo version.
David Dias's avatar
David Dias committed
156
`,
157
	},
158
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
159 160
		cmdkit.BoolOption(repoSizeOnlyOptionName, "Only report RepoSize and StorageMax."),
		cmdkit.BoolOption(repoHumanOptionName, "Output sizes in MiB."),
161
	},
keks's avatar
keks committed
162
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
163
		n, err := cmdenv.GetNode(env)
164
		if err != nil {
keks's avatar
keks committed
165
			return err
166 167
		}

Kejie Zhang's avatar
Kejie Zhang committed
168
		sizeOnly, _ := req.Options[repoSizeOnlyOptionName].(bool)
169
		if sizeOnly {
170 171
			sizeStat, err := corerepo.RepoSize(req.Context, n)
			if err != nil {
keks's avatar
keks committed
172
				return err
173 174 175 176
			}
			cmds.EmitOnce(res, &corerepo.Stat{
				SizeStat: sizeStat,
			})
keks's avatar
keks committed
177
			return nil
178 179
		}

180
		stat, err := corerepo.RepoStat(req.Context, n)
181
		if err != nil {
keks's avatar
keks committed
182
			return err
183 184
		}

keks's avatar
keks committed
185
		return cmds.EmitOnce(res, &stat)
186
	},
187
	Type: &corerepo.Stat{},
Jan Winkelmann's avatar
Jan Winkelmann committed
188
	Encoders: cmds.EncoderMap{
189
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
190
			stat, ok := v.(*corerepo.Stat)
191
			if !ok {
Jan Winkelmann's avatar
Jan Winkelmann committed
192
				return e.TypeErr(stat, v)
193 194
			}

Jan Winkelmann's avatar
Jan Winkelmann committed
195
			wtr := tabwriter.NewWriter(w, 0, 0, 1, ' ', 0)
196 197
			defer wtr.Flush()

Kejie Zhang's avatar
Kejie Zhang committed
198 199
			human, _ := req.Options[repoHumanOptionName].(bool)
			sizeOnly, _ := req.Options[repoSizeOnlyOptionName].(bool)
Jan Winkelmann's avatar
Jan Winkelmann committed
200

201 202 203 204
			printSize := func(name string, size uint64) {
				sizeInMiB := size / (1024 * 1024)
				if human && sizeInMiB > 0 {
					fmt.Fprintf(wtr, "%s (MiB):\t%d\n", name, sizeInMiB)
205
				} else {
206
					fmt.Fprintf(wtr, "%s:\t%d\n", name, size)
207
				}
208
			}
209

210 211
			if !sizeOnly {
				fmt.Fprintf(wtr, "NumObjects:\t%d\n", stat.NumObjects)
212 213
			}

214 215
			printSize("RepoSize", stat.RepoSize)
			printSize("StorageMax", stat.StorageMax)
Thomas Gardner's avatar
Thomas Gardner committed
216

217 218 219 220
			if !sizeOnly {
				fmt.Fprintf(wtr, "RepoPath:\t%s\n", stat.RepoPath)
				fmt.Fprintf(wtr, "Version:\t%s\n", stat.Version)
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
221

222
			return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
223
		}),
224 225
	},
}
michael's avatar
michael committed
226

227
var RepoFsckCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
228
	Helptext: cmdkit.HelpText{
229
		Tagline: "Remove repo lockfiles.",
michael's avatar
michael committed
230 231 232 233 234 235
		ShortDescription: `
'ipfs repo fsck' is a plumbing command that will remove repo and level db
lockfiles, as well as the api file. This command can only run when no ipfs
daemons are running.
`,
	},
236 237 238 239 240
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		configRoot, err := cmdenv.GetConfigRoot(env)
		if err != nil {
			return err
		}
michael's avatar
michael committed
241 242 243

		dsPath, err := config.DataStorePath(configRoot)
		if err != nil {
244
			return err
michael's avatar
michael committed
245 246 247
		}

		dsLockFile := filepath.Join(dsPath, "LOCK") // TODO: get this lockfile programmatically
248
		repoLockFile := filepath.Join(configRoot, fsrepo.LockFile)
michael's avatar
michael committed
249 250 251 252 253 254 255 256
		apiFile := filepath.Join(configRoot, "api") // TODO: get this programmatically

		log.Infof("Removing repo lockfile: %s", repoLockFile)
		log.Infof("Removing datastore lockfile: %s", dsLockFile)
		log.Infof("Removing api file: %s", apiFile)

		err = os.Remove(repoLockFile)
		if err != nil && !os.IsNotExist(err) {
257
			return err
michael's avatar
michael committed
258 259 260
		}
		err = os.Remove(dsLockFile)
		if err != nil && !os.IsNotExist(err) {
261
			return err
michael's avatar
michael committed
262 263 264
		}
		err = os.Remove(apiFile)
		if err != nil && !os.IsNotExist(err) {
265
			return err
michael's avatar
michael committed
266 267
		}

268
		return cmds.EmitOnce(res, &MessageOutput{"Lockfiles have been removed.\n"})
michael's avatar
michael committed
269 270
	},
	Type: MessageOutput{},
271 272 273 274 275
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *MessageOutput) error {
			fmt.Fprintf(w, out.Message)
			return nil
		}),
Michael Pfister's avatar
Michael Pfister committed
276 277 278
	},
}

Jeromy's avatar
Jeromy committed
279
type VerifyProgress struct {
Jan Winkelmann's avatar
Jan Winkelmann committed
280
	Msg      string
Jeromy's avatar
Jeromy committed
281 282 283
	Progress int
}

284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) {
	defer wg.Done()

	for k := range keys {
		_, err := bs.Get(k)
		if err != nil {
			select {
			case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
			case <-ctx.Done():
				return
			}

			continue
		}

		select {
		case results <- "":
		case <-ctx.Done():
			return
		}
	}
}

func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string {
	results := make(chan string)

	go func() {
		defer close(results)

		var wg sync.WaitGroup

		for i := 0; i < runtime.NumCPU()*2; i++ {
			wg.Add(1)
			go verifyWorkerRun(ctx, &wg, keys, results, bs)
		}

		wg.Wait()
	}()

	return results
}

Jan Winkelmann's avatar
Jan Winkelmann committed
326 327
var repoVerifyCmd = &oldcmds.Command{
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
328 329
		Tagline: "Verify all blocks in repo are not corrupted.",
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
330
	Run: func(req oldcmds.Request, res oldcmds.Response) {
Jeromy's avatar
Jeromy committed
331 332
		nd, err := req.InvocContext().GetNode()
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
333
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
334 335 336 337
			return
		}

		out := make(chan interface{})
Jan Winkelmann's avatar
Jan Winkelmann committed
338 339
		res.SetOutput((<-chan interface{})(out))
		defer close(out)
Jeromy's avatar
Jeromy committed
340

Jan Winkelmann's avatar
Jan Winkelmann committed
341 342
		bs := bstore.NewBlockstore(nd.Repo.Datastore())
		bs.HashOnRead(true)
Jeromy's avatar
Jeromy committed
343

Jan Winkelmann's avatar
Jan Winkelmann committed
344 345 346 347 348 349
		keys, err := bs.AllKeysChan(req.Context())
		if err != nil {
			log.Error(err)
			return
		}

350 351
		results := verifyResultChan(req.Context(), keys, bs)

Jan Winkelmann's avatar
Jan Winkelmann committed
352 353
		var fails int
		var i int
354 355
		for msg := range results {
			if msg != "" {
356
				select {
357
				case out <- &VerifyProgress{Msg: msg}:
358 359
				case <-req.Context().Done():
					return
Jan Winkelmann's avatar
Jan Winkelmann committed
360 361
				}
				fails++
Jeromy's avatar
Jeromy committed
362
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
363
			i++
364 365 366 367 368
			select {
			case out <- &VerifyProgress{Progress: i}:
			case <-req.Context().Done():
				return
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
369
		}
Jeromy's avatar
Jeromy committed
370

Jan Winkelmann's avatar
Jan Winkelmann committed
371
		if fails == 0 {
372 373 374 375 376
			select {
			case out <- &VerifyProgress{Msg: "verify complete, all blocks validated."}:
			case <-req.Context().Done():
				return
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
377 378 379 380 381 382 383 384 385 386
		} else {
			res.SetError(fmt.Errorf("verify complete, some blocks were corrupt"), cmdkit.ErrNormal)
		}
	},
	Type: &VerifyProgress{},
	Marshalers: oldcmds.MarshalerMap{
		oldcmds.Text: func(res oldcmds.Response) (io.Reader, error) {
			v, err := unwrapOutput(res.Output())
			if err != nil {
				return nil, err
Jeromy's avatar
Jeromy committed
387
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
388 389 390 391

			obj, ok := v.(*VerifyProgress)
			if !ok {
				return nil, e.TypeErr(obj, v)
Jeromy's avatar
Jeromy committed
392 393
			}

Jan Winkelmann's avatar
Jan Winkelmann committed
394 395 396 397 398
			buf := new(bytes.Buffer)
			if strings.Contains(obj.Msg, "was corrupt") {
				fmt.Fprintln(os.Stdout, obj.Msg)
				return buf, nil
			}
Jeromy's avatar
Jeromy committed
399

Jan Winkelmann's avatar
Jan Winkelmann committed
400 401 402
			if obj.Msg != "" {
				if len(obj.Msg) < 20 {
					obj.Msg += "             "
Jeromy's avatar
Jeromy committed
403
				}
Jan Winkelmann's avatar
Jan Winkelmann committed
404
				fmt.Fprintln(buf, obj.Msg)
Jeromy's avatar
Jeromy committed
405 406 407
				return buf, nil
			}

Jan Winkelmann's avatar
Jan Winkelmann committed
408 409
			fmt.Fprintf(buf, "%d blocks processed.\r", obj.Progress)
			return buf, nil
Jeromy's avatar
Jeromy committed
410 411 412 413
		},
	},
}

414
var repoVersionCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
415
	Helptext: cmdkit.HelpText{
Michael Pfister's avatar
Michael Pfister committed
416 417 418 419 420 421
		Tagline: "Show the repo version.",
		ShortDescription: `
'ipfs repo version' returns the current repo version.
`,
	},

Jan Winkelmann's avatar
Jan Winkelmann committed
422
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
423
		cmdkit.BoolOption(repoQuietOptionName, "q", "Write minimal output."),
Michael Pfister's avatar
Michael Pfister committed
424
	},
425 426
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		return cmds.EmitOnce(res, &RepoVersion{
427
			Version: fmt.Sprint(fsrepo.RepoVersion),
Michael Pfister's avatar
Michael Pfister committed
428 429 430
		})
	},
	Type: RepoVersion{},
431 432 433
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RepoVersion) error {
			quiet, _ := req.Options[repoQuietOptionName].(bool)
Michael Pfister's avatar
Michael Pfister committed
434 435

			if quiet {
436
				fmt.Fprintf(w, fmt.Sprintf("fs-repo@%s\n", out.Version))
Michael Pfister's avatar
Michael Pfister committed
437
			} else {
438
				fmt.Fprintf(w, fmt.Sprintf("ipfs repo version fs-repo@%s\n", out.Version))
Michael Pfister's avatar
Michael Pfister committed
439
			}
440 441
			return nil
		}),
michael's avatar
michael committed
442 443
	},
}