p2p.go 13.9 KB
Newer Older
1 2 3
package commands

import (
Łukasz Magiera's avatar
Łukasz Magiera committed
4
	"context"
5
	"errors"
6
	"fmt"
7 8
	"io"
	"strconv"
Łukasz Magiera's avatar
Łukasz Magiera committed
9
	"strings"
10
	"text/tabwriter"
11
	"time"
12

13
	core "github.com/ipfs/go-ipfs/core"
Overbool's avatar
Overbool committed
14
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
Łukasz Magiera's avatar
Łukasz Magiera committed
15
	p2p "github.com/ipfs/go-ipfs/p2p"
16

Jakub Sztandera's avatar
Jakub Sztandera committed
17
	cmds "github.com/ipfs/go-ipfs-cmds"
Steven Allen's avatar
Steven Allen committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
Raúl Kripalani's avatar
Raúl Kripalani committed
19 20
	pstore "github.com/libp2p/go-libp2p-core/peerstore"
	protocol "github.com/libp2p/go-libp2p-core/protocol"
Jakub Sztandera's avatar
Jakub Sztandera committed
21 22
	ma "github.com/multiformats/go-multiaddr"
	madns "github.com/multiformats/go-multiaddr-dns"
23 24
)

25
// P2PProtoPrefix is the default required prefix for protocol names
26 27
const P2PProtoPrefix = "/x/"

Łukasz Magiera's avatar
Łukasz Magiera committed
28 29
// P2PListenerInfoOutput is output type of ls command
type P2PListenerInfoOutput struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
30 31 32
	Protocol      string
	ListenAddress string
	TargetAddress string
33 34
}

Łukasz Magiera's avatar
Łukasz Magiera committed
35 36
// P2PStreamInfoOutput is output type of streams command
type P2PStreamInfoOutput struct {
37
	HandlerID     string
38
	Protocol      string
39 40
	OriginAddress string
	TargetAddress string
41 42
}

Łukasz Magiera's avatar
Łukasz Magiera committed
43 44 45
// P2PLsOutput is output type of ls command
type P2PLsOutput struct {
	Listeners []P2PListenerInfoOutput
46 47
}

Łukasz Magiera's avatar
Łukasz Magiera committed
48 49 50
// P2PStreamsOutput is output type of streams command
type P2PStreamsOutput struct {
	Streams []P2PStreamInfoOutput
51 52
}

53
const (
Kejie Zhang's avatar
Kejie Zhang committed
54
	allowCustomProtocolOptionName = "allow-custom-protocol"
55
	reportPeerIDOptionName        = "report-peer-id"
Kejie Zhang's avatar
Kejie Zhang committed
56 57
)

58 59
var resolveTimeout = 10 * time.Second

Łukasz Magiera's avatar
Łukasz Magiera committed
60
// P2PCmd is the 'ipfs p2p' command
Łukasz Magiera's avatar
Łukasz Magiera committed
61
var P2PCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
62
	Helptext: cmds.HelpText{
63 64
		Tagline: "Libp2p stream mounting.",
		ShortDescription: `
65
Create and use tunnels to remote peers over libp2p
66

67 68
Note: this command is experimental and subject to change as usecases and APIs
are refined`,
69 70 71
	},

	Subcommands: map[string]*cmds.Command{
Overbool's avatar
Overbool committed
72
		"stream":  p2pStreamCmd,
Łukasz Magiera's avatar
Łukasz Magiera committed
73
		"forward": p2pForwardCmd,
74
		"listen":  p2pListenCmd,
75
		"close":   p2pCloseCmd,
Łukasz Magiera's avatar
Łukasz Magiera committed
76
		"ls":      p2pLsCmd,
77 78 79
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
80
var p2pForwardCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
81
	Helptext: cmds.HelpText{
82
		Tagline: "Forward connections to libp2p service",
Łukasz Magiera's avatar
Łukasz Magiera committed
83
		ShortDescription: `
Łukasz Magiera's avatar
Łukasz Magiera committed
84
Forward connections made to <listen-address> to <target-address>.
Łukasz Magiera's avatar
Łukasz Magiera committed
85

Łukasz Magiera's avatar
Łukasz Magiera committed
86
<protocol> specifies the libp2p protocol name to use for libp2p
87
connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'.
Łukasz Magiera's avatar
Łukasz Magiera committed
88

89
Example:
90 91
  ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer
    - Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer
Łukasz Magiera's avatar
Łukasz Magiera committed
92 93 94

`,
	},
Steven Allen's avatar
Steven Allen committed
95 96 97 98
	Arguments: []cmds.Argument{
		cmds.StringArg("protocol", true, false, "Protocol name."),
		cmds.StringArg("listen-address", true, false, "Listening endpoint."),
		cmds.StringArg("target-address", true, false, "Target endpoint."),
Łukasz Magiera's avatar
Łukasz Magiera committed
99
	},
Steven Allen's avatar
Steven Allen committed
100 101
	Options: []cmds.Option{
		cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
102
	},
Overbool's avatar
Overbool committed
103 104
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := p2pGetNode(env)
Łukasz Magiera's avatar
Łukasz Magiera committed
105
		if err != nil {
Overbool's avatar
Overbool committed
106
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
107 108
		}

Overbool's avatar
Overbool committed
109 110 111
		protoOpt := req.Arguments[0]
		listenOpt := req.Arguments[1]
		targetOpt := req.Arguments[2]
Łukasz Magiera's avatar
Łukasz Magiera committed
112 113 114 115 116

		proto := protocol.ID(protoOpt)

		listen, err := ma.NewMultiaddr(listenOpt)
		if err != nil {
Overbool's avatar
Overbool committed
117
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119
		}

120
		targets, err := parseIpfsAddr(targetOpt)
Łukasz Magiera's avatar
Łukasz Magiera committed
121
		if err != nil {
Overbool's avatar
Overbool committed
122
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
123
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
124

Overbool's avatar
Overbool committed
125
		allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
126

Łukasz Magiera's avatar
Łukasz Magiera committed
127
		if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
Overbool's avatar
Overbool committed
128
			return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
129 130
		}

Overbool's avatar
Overbool committed
131
		return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
132 133 134
	},
}

135
// parseIpfsAddr is a function that takes in addr string and return ipfsAddrs
Steven Allen's avatar
Steven Allen committed
136 137
func parseIpfsAddr(addr string) (*peer.AddrInfo, error) {
	multiaddr, err := ma.NewMultiaddr(addr)
138 139 140
	if err != nil {
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
141 142 143 144

	pi, err := peer.AddrInfoFromP2pAddr(multiaddr)
	if err == nil {
		return pi, nil
145
	}
Steven Allen's avatar
Steven Allen committed
146 147

	// resolve multiaddr whose protocol is not ma.P_IPFS
148
	ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
149
	defer cancel()
Steven Allen's avatar
Steven Allen committed
150
	addrs, err := madns.Resolve(ctx, multiaddr)
151 152 153
	if err != nil {
		return nil, err
	}
154
	if len(addrs) == 0 {
Steven Allen's avatar
Steven Allen committed
155
		return nil, errors.New("fail to resolve the multiaddr:" + multiaddr.String())
156
	}
Steven Allen's avatar
Steven Allen committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
	var info peer.AddrInfo
	for _, addr := range addrs {
		taddr, id := peer.SplitAddr(addr)
		if id == "" {
			// not an ipfs addr, skipping.
			continue
		}
		switch info.ID {
		case "":
			info.ID = id
		case id:
		default:
			return nil, fmt.Errorf(
				"ambiguous multiaddr %s could refer to %s or %s",
				multiaddr,
				info.ID,
				id,
			)
175
		}
Steven Allen's avatar
Steven Allen committed
176
		info.Addrs = append(info.Addrs, taddr)
177
	}
Steven Allen's avatar
Steven Allen committed
178
	return &info, nil
179 180
}

181
var p2pListenCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
182
	Helptext: cmds.HelpText{
183 184 185 186 187 188 189 190 191 192 193 194
		Tagline: "Create libp2p service",
		ShortDescription: `
Create libp2p service and forward connections made to <target-address>.

<protocol> specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'.

Example:
  ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234
    - Forward connections to 'myproto' libp2p service to 127.0.0.1:1234

`,
	},
Steven Allen's avatar
Steven Allen committed
195 196 197
	Arguments: []cmds.Argument{
		cmds.StringArg("protocol", true, false, "Protocol name."),
		cmds.StringArg("target-address", true, false, "Target endpoint."),
198
	},
Steven Allen's avatar
Steven Allen committed
199 200 201
	Options: []cmds.Option{
		cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
		cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
202
	},
Overbool's avatar
Overbool committed
203 204
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := p2pGetNode(env)
205
		if err != nil {
Overbool's avatar
Overbool committed
206
			return err
207 208
		}

Overbool's avatar
Overbool committed
209 210
		protoOpt := req.Arguments[0]
		targetOpt := req.Arguments[1]
Łukasz Magiera's avatar
Łukasz Magiera committed
211 212 213 214 215

		proto := protocol.ID(protoOpt)

		target, err := ma.NewMultiaddr(targetOpt)
		if err != nil {
Overbool's avatar
Overbool committed
216
			return err
Łukasz Magiera's avatar
Łukasz Magiera committed
217
		}
218

Overbool's avatar
Overbool committed
219 220
		// port can't be 0
		if err := checkPort(target); err != nil {
Overbool's avatar
Overbool committed
221
			return err
Overbool's avatar
Overbool committed
222 223
		}

Overbool's avatar
Overbool committed
224
		allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
225
		reportPeerID, _ := req.Options[reportPeerIDOptionName].(bool)
226

Łukasz Magiera's avatar
Łukasz Magiera committed
227
		if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
Overbool's avatar
Overbool committed
228
			return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
229 230
		}

231 232
		_, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
		return err
Łukasz Magiera's avatar
Łukasz Magiera committed
233 234 235
	},
}

Overbool's avatar
Overbool committed
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
// checkPort checks whether target multiaddr contains tcp or udp protocol
// and whether the port is equal to 0
func checkPort(target ma.Multiaddr) error {
	// get tcp or udp port from multiaddr
	getPort := func() (string, error) {
		sport, _ := target.ValueForProtocol(ma.P_TCP)
		if sport != "" {
			return sport, nil
		}

		sport, _ = target.ValueForProtocol(ma.P_UDP)
		if sport != "" {
			return sport, nil
		}
		return "", fmt.Errorf("address does not contain tcp or udp protocol")
	}

	sport, err := getPort()
	if err != nil {
		return err
	}

	port, err := strconv.Atoi(sport)
	if err != nil {
		return err
	}

	if port == 0 {
Overbool's avatar
Overbool committed
264
		return fmt.Errorf("port can not be 0")
Overbool's avatar
Overbool committed
265 266 267 268 269
	}

	return nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
270
// forwardLocal forwards local connections to a libp2p service
Steven Allen's avatar
Steven Allen committed
271 272
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error {
	ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
Łukasz Magiera's avatar
Łukasz Magiera committed
273
	// TODO: return some info
Steven Allen's avatar
Steven Allen committed
274
	_, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
Łukasz Magiera's avatar
Łukasz Magiera committed
275 276 277
	return err
}

Kejie Zhang's avatar
Kejie Zhang committed
278 279 280 281
const (
	p2pHeadersOptionName = "headers"
)

Łukasz Magiera's avatar
Łukasz Magiera committed
282
var p2pLsCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
283
	Helptext: cmds.HelpText{
284
		Tagline: "List active p2p listeners.",
285
	},
Steven Allen's avatar
Steven Allen committed
286 287
	Options: []cmds.Option{
		cmds.BoolOption(p2pHeadersOptionName, "v", "Print table headers (Protocol, Listen, Target)."),
288
	},
Overbool's avatar
Overbool committed
289 290
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := p2pGetNode(env)
291
		if err != nil {
Overbool's avatar
Overbool committed
292
			return err
293 294
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
295
		output := &P2PLsOutput{}
296

Łukasz Magiera's avatar
Łukasz Magiera committed
297 298
		n.P2P.ListenersLocal.Lock()
		for _, listener := range n.P2P.ListenersLocal.Listeners {
Łukasz Magiera's avatar
Łukasz Magiera committed
299
			output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Łukasz Magiera's avatar
Łukasz Magiera committed
300 301 302
				Protocol:      string(listener.Protocol()),
				ListenAddress: listener.ListenAddress().String(),
				TargetAddress: listener.TargetAddress().String(),
303 304
			})
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
305 306 307 308 309 310 311 312 313 314 315
		n.P2P.ListenersLocal.Unlock()

		n.P2P.ListenersP2P.Lock()
		for _, listener := range n.P2P.ListenersP2P.Listeners {
			output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
				Protocol:      string(listener.Protocol()),
				ListenAddress: listener.ListenAddress().String(),
				TargetAddress: listener.TargetAddress().String(),
			})
		}
		n.P2P.ListenersP2P.Unlock()
316

317
		return cmds.EmitOnce(res, output)
318
	},
Łukasz Magiera's avatar
Łukasz Magiera committed
319
	Type: P2PLsOutput{},
Overbool's avatar
Overbool committed
320 321 322 323 324
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PLsOutput) error {
			headers, _ := req.Options[p2pHeadersOptionName].(bool)
			tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
			for _, listener := range out.Listeners {
325
				if headers {
Overbool's avatar
Overbool committed
326
					fmt.Fprintln(tw, "Protocol\tListen Address\tTarget Address")
327 328
				}

Overbool's avatar
Overbool committed
329
				fmt.Fprintf(tw, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress)
330
			}
Overbool's avatar
Overbool committed
331
			tw.Flush()
332

Overbool's avatar
Overbool committed
333 334
			return nil
		}),
335 336 337
	},
}

Kejie Zhang's avatar
Kejie Zhang committed
338 339 340 341 342 343 344
const (
	p2pAllOptionName           = "all"
	p2pProtocolOptionName      = "protocol"
	p2pListenAddressOptionName = "listen-address"
	p2pTargetAddressOptionName = "target-address"
)

345
var p2pCloseCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
346
	Helptext: cmds.HelpText{
347 348
		Tagline: "Stop listening for new connections to forward.",
	},
Steven Allen's avatar
Steven Allen committed
349 350 351 352 353
	Options: []cmds.Option{
		cmds.BoolOption(p2pAllOptionName, "a", "Close all listeners."),
		cmds.StringOption(p2pProtocolOptionName, "p", "Match protocol name"),
		cmds.StringOption(p2pListenAddressOptionName, "l", "Match listen address"),
		cmds.StringOption(p2pTargetAddressOptionName, "t", "Match target address"),
354
	},
Overbool's avatar
Overbool committed
355 356
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := p2pGetNode(env)
357
		if err != nil {
Overbool's avatar
Overbool committed
358
			return err
359 360
		}

Overbool's avatar
Overbool committed
361 362 363 364
		closeAll, _ := req.Options[p2pAllOptionName].(bool)
		protoOpt, p := req.Options[p2pProtocolOptionName].(string)
		listenOpt, l := req.Options[p2pListenAddressOptionName].(string)
		targetOpt, t := req.Options[p2pTargetAddressOptionName].(string)
Łukasz Magiera's avatar
Łukasz Magiera committed
365 366 367

		proto := protocol.ID(protoOpt)

368 369 370 371 372 373 374 375 376
		var (
			target, listen ma.Multiaddr
		)

		if l {
			listen, err = ma.NewMultiaddr(listenOpt)
			if err != nil {
				return err
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
377 378
		}

379 380 381 382 383
		if t {
			target, err = ma.NewMultiaddr(targetOpt)
			if err != nil {
				return err
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
384
		}
385 386

		if !(closeAll || p || l || t) {
Overbool's avatar
Overbool committed
387
			return errors.New("no matching options given")
388 389 390
		}

		if closeAll && (p || l || t) {
Overbool's avatar
Overbool committed
391
			return errors.New("can't combine --all with other matching options")
392 393
		}

394
		match := func(listener p2p.Listener) bool {
Łukasz Magiera's avatar
Łukasz Magiera committed
395 396
			if closeAll {
				return true
397
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
398 399
			if p && proto != listener.Protocol() {
				return false
400
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
401 402
			if l && !listen.Equal(listener.ListenAddress()) {
				return false
403
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
404 405 406 407
			if t && !target.Equal(listener.TargetAddress()) {
				return false
			}
			return true
408 409
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
410 411
		done := n.P2P.ListenersLocal.Close(match)
		done += n.P2P.ListenersP2P.Close(match)
Łukasz Magiera's avatar
Łukasz Magiera committed
412

413
		return cmds.EmitOnce(res, done)
414 415
	},
	Type: int(0),
Overbool's avatar
Overbool committed
416 417 418 419 420
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out int) error {
			fmt.Fprintf(w, "Closed %d stream(s)\n", out)
			return nil
		}),
421 422 423
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
424
///////
425
// Stream
Łukasz Magiera's avatar
Łukasz Magiera committed
426 427 428 429
//

// p2pStreamCmd is the 'ipfs p2p stream' command
var p2pStreamCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
430
	Helptext: cmds.HelpText{
Łukasz Magiera's avatar
Łukasz Magiera committed
431 432 433 434 435 436 437 438 439 440
		Tagline:          "P2P stream management.",
		ShortDescription: "Create and manage p2p streams",
	},

	Subcommands: map[string]*cmds.Command{
		"ls":    p2pStreamLsCmd,
		"close": p2pStreamCloseCmd,
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
441
var p2pStreamLsCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
442
	Helptext: cmds.HelpText{
443
		Tagline: "List active p2p streams.",
444
	},
Steven Allen's avatar
Steven Allen committed
445 446
	Options: []cmds.Option{
		cmds.BoolOption(p2pHeadersOptionName, "v", "Print table headers (ID, Protocol, Local, Remote)."),
447
	},
Overbool's avatar
Overbool committed
448 449
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := p2pGetNode(env)
450
		if err != nil {
Overbool's avatar
Overbool committed
451
			return err
452 453
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
454
		output := &P2PStreamsOutput{}
455

Łukasz Magiera's avatar
Łukasz Magiera committed
456
		n.P2P.Streams.Lock()
457
		for id, s := range n.P2P.Streams.Streams {
Łukasz Magiera's avatar
Łukasz Magiera committed
458
			output.Streams = append(output.Streams, P2PStreamInfoOutput{
459
				HandlerID: strconv.FormatUint(id, 10),
460

Łukasz Magiera's avatar
Łukasz Magiera committed
461
				Protocol: string(s.Protocol),
462

463 464
				OriginAddress: s.OriginAddr.String(),
				TargetAddress: s.TargetAddr.String(),
465 466
			})
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
467
		n.P2P.Streams.Unlock()
468

469
		return cmds.EmitOnce(res, output)
470
	},
Łukasz Magiera's avatar
Łukasz Magiera committed
471
	Type: P2PStreamsOutput{},
Overbool's avatar
Overbool committed
472 473 474 475 476
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PStreamsOutput) error {
			headers, _ := req.Options[p2pHeadersOptionName].(bool)
			tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
			for _, stream := range out.Streams {
477
				if headers {
Overbool's avatar
Overbool committed
478
					fmt.Fprintln(tw, "ID\tProtocol\tOrigin\tTarget")
479 480
				}

Overbool's avatar
Overbool committed
481
				fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress)
482
			}
Overbool's avatar
Overbool committed
483
			tw.Flush()
484

Overbool's avatar
Overbool committed
485 486
			return nil
		}),
487 488 489
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
490
var p2pStreamCloseCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
491
	Helptext: cmds.HelpText{
492 493
		Tagline: "Close active p2p stream.",
	},
Steven Allen's avatar
Steven Allen committed
494 495
	Arguments: []cmds.Argument{
		cmds.StringArg("id", false, false, "Stream identifier"),
496
	},
Steven Allen's avatar
Steven Allen committed
497 498
	Options: []cmds.Option{
		cmds.BoolOption(p2pAllOptionName, "a", "Close all streams."),
499
	},
Overbool's avatar
Overbool committed
500 501
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
		n, err := p2pGetNode(env)
502
		if err != nil {
Overbool's avatar
Overbool committed
503
			return err
504 505
		}

Overbool's avatar
Overbool committed
506
		closeAll, _ := req.Options[p2pAllOptionName].(bool)
507
		var handlerID uint64
508

509
		if !closeAll {
Overbool's avatar
Overbool committed
510 511
			if len(req.Arguments) == 0 {
				return errors.New("no id specified")
512
			}
513

Overbool's avatar
Overbool committed
514
			handlerID, err = strconv.ParseUint(req.Arguments[0], 10, 64)
515
			if err != nil {
Overbool's avatar
Overbool committed
516
				return err
517 518 519
			}
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
520 521
		toClose := make([]*p2p.Stream, 0, 1)
		n.P2P.Streams.Lock()
522 523
		for id, stream := range n.P2P.Streams.Streams {
			if !closeAll && handlerID != id {
524
				continue
525
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
526
			toClose = append(toClose, stream)
527 528
			if !closeAll {
				break
529 530
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
531 532 533
		n.P2P.Streams.Unlock()

		for _, s := range toClose {
Łukasz Magiera's avatar
Łukasz Magiera committed
534
			n.P2P.Streams.Reset(s)
Łukasz Magiera's avatar
Łukasz Magiera committed
535
		}
Overbool's avatar
Overbool committed
536 537

		return nil
538 539
	},
}
540

Overbool's avatar
Overbool committed
541 542
func p2pGetNode(env cmds.Environment) (*core.IpfsNode, error) {
	nd, err := cmdenv.GetNode(env)
543 544 545 546
	if err != nil {
		return nil, err
	}

Overbool's avatar
Overbool committed
547
	config, err := nd.Repo.Config()
548
	if err != nil {
549
		return nil, err
550 551 552
	}

	if !config.Experimental.Libp2pStreamMounting {
553
		return nil, errors.New("libp2p stream mounting not enabled")
554
	}
555

556
	if !nd.IsOnline {
557
		return nil, ErrNotOnline
558 559
	}

Overbool's avatar
Overbool committed
560
	return nd, nil
561
}