p2p.go 10.3 KB
Newer Older
1 2 3
package commands

import (
4
	"bytes"
Łukasz Magiera's avatar
Łukasz Magiera committed
5
	"context"
6
	"errors"
7
	"fmt"
8 9
	"io"
	"strconv"
Łukasz Magiera's avatar
Łukasz Magiera committed
10
	"strings"
11
	"text/tabwriter"
12 13

	cmds "github.com/ipfs/go-ipfs/commands"
14
	core "github.com/ipfs/go-ipfs/core"
Łukasz Magiera's avatar
Łukasz Magiera committed
15
	p2p "github.com/ipfs/go-ipfs/p2p"
16

17
	pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
18
	ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
19
	"gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit"
20 21
)

Łukasz Magiera's avatar
Łukasz Magiera committed
22 23
// P2PListenerInfoOutput is output type of ls command
type P2PListenerInfoOutput struct {
Łukasz Magiera's avatar
Łukasz Magiera committed
24 25 26
	Protocol      string
	ListenAddress string
	TargetAddress string
27 28
}

Łukasz Magiera's avatar
Łukasz Magiera committed
29 30
// P2PStreamInfoOutput is output type of streams command
type P2PStreamInfoOutput struct {
31
	HandlerID     string
32
	Protocol      string
33 34
	OriginAddress string
	TargetAddress string
35 36
}

Łukasz Magiera's avatar
Łukasz Magiera committed
37 38 39
// P2PLsOutput is output type of ls command
type P2PLsOutput struct {
	Listeners []P2PListenerInfoOutput
40 41
}

Łukasz Magiera's avatar
Łukasz Magiera committed
42 43 44
// P2PStreamsOutput is output type of streams command
type P2PStreamsOutput struct {
	Streams []P2PStreamInfoOutput
45 46
}

Łukasz Magiera's avatar
Łukasz Magiera committed
47
// P2PCmd is the 'ipfs p2p' command
Łukasz Magiera's avatar
Łukasz Magiera committed
48
var P2PCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
49
	Helptext: cmdkit.HelpText{
50 51
		Tagline: "Libp2p stream mounting.",
		ShortDescription: `
52
Create and use tunnels to remote peers over libp2p
53

54 55
Note: this command is experimental and subject to change as usecases and APIs
are refined`,
56 57 58
	},

	Subcommands: map[string]*cmds.Command{
Łukasz Magiera's avatar
Łukasz Magiera committed
59 60 61
		"stream": p2pStreamCmd,

		"forward": p2pForwardCmd,
62
		"close":   p2pCloseCmd,
Łukasz Magiera's avatar
Łukasz Magiera committed
63
		"ls":      p2pLsCmd,
64 65 66
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
var p2pForwardCmd = &cmds.Command{
	Helptext: cmdkit.HelpText{
		Tagline: "Forward connections to or from libp2p services",
		ShortDescription: `
Forward connections to <listen-address> to <target-address>. Protocol specifies
the libp2p protocol to use.

To create libp2p service listener, specify '/ipfs' as <listen-address>

Examples:
  ipfs p2p forward myproto /ipfs /ip4/127.0.0.1/tcp/1234
    - Forward connections to 'myproto' libp2p service to 127.0.0.1:1234

  ipfs p2p forward myproto /ip4/127.0.0.1/tcp/4567 /ipfs/QmPeer
    - Forward connections to 127.0.0.1:4567 to 'myproto' service on /ipfs/QmPeer

`,
	},
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("protocol", true, false, "Protocol identifier."),
		cmdkit.StringArg("listen-address", true, false, "Listening endpoint"),
		cmdkit.StringArg("target-address", true, false, "Target endpoint."),
	},
	Run: func(req cmds.Request, res cmds.Response) {
91
		n, err := p2pGetNode(req)
Łukasz Magiera's avatar
Łukasz Magiera committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
		if err != nil {
			res.SetError(err, cmdkit.ErrNormal)
			return
		}

		//TODO: Do we really want/need implicit prefix?
		proto := "/p2p/" + req.Arguments()[0]
		listen := req.Arguments()[1]
		target := req.Arguments()[2]

		if strings.HasPrefix(listen, "/ipfs") {
			if listen != "/ipfs" {
				res.SetError(errors.New("only '/ipfs' is allowed as libp2p listen address"), cmdkit.ErrNormal)
				return
			}

			if err := forwardRemote(n.Context(), n.P2P, proto, target); err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}
		} else {
			if err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, target); err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}
		}
		res.SetOutput(nil)
	},
}

// forwardRemote forwards libp2p service connections to a manet address
func forwardRemote(ctx context.Context, p *p2p.P2P, proto string, target string) error {
	if strings.HasPrefix(target, "/ipfs") {
		return errors.New("cannot forward libp2p service connections to another libp2p service")
	}

	addr, err := ma.NewMultiaddr(target)
	if err != nil {
		return err
	}

	// TODO: return some info
Łukasz Magiera's avatar
Łukasz Magiera committed
134
	_, err = p.ForwardRemote(ctx, proto, addr)
Łukasz Magiera's avatar
Łukasz Magiera committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	return err
}

// forwardLocal forwards local connections to a libp2p service
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto string, listen string, target string) error {
	bindAddr, err := ma.NewMultiaddr(listen)
	if err != nil {
		return err
	}

	addr, peer, err := ParsePeerParam(target)
	if err != nil {
		return err
	}

	if addr != nil {
		ps.AddAddr(peer, addr, pstore.TempAddrTTL)
	}

	// TODO: return some info
Łukasz Magiera's avatar
Łukasz Magiera committed
155
	_, err = p.ForwardLocal(ctx, peer, proto, bindAddr)
Łukasz Magiera's avatar
Łukasz Magiera committed
156 157 158
	return err
}

Łukasz Magiera's avatar
Łukasz Magiera committed
159
var p2pLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
160
	Helptext: cmdkit.HelpText{
161
		Tagline: "List active p2p listeners.",
162
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
163
	Options: []cmdkit.Option{
Łukasz Magiera's avatar
Łukasz Magiera committed
164
		cmdkit.BoolOption("headers", "v", "Print table headers (Protocol, Listen, Target)."),
165 166
	},
	Run: func(req cmds.Request, res cmds.Response) {
167
		n, err := p2pGetNode(req)
168
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
169
			res.SetError(err, cmdkit.ErrNormal)
170 171 172
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
173
		output := &P2PLsOutput{}
174

Łukasz Magiera's avatar
Łukasz Magiera committed
175 176
		for _, listener := range n.P2P.Listeners.Listeners {
			output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Łukasz Magiera's avatar
Łukasz Magiera committed
177 178 179
				Protocol:      listener.Protocol(),
				ListenAddress: listener.ListenAddress(),
				TargetAddress: listener.TargetAddress(),
180 181 182
			})
		}

183 184
		res.SetOutput(output)
	},
Łukasz Magiera's avatar
Łukasz Magiera committed
185
	Type: P2PLsOutput{},
186 187
	Marshalers: cmds.MarshalerMap{
		cmds.Text: func(res cmds.Response) (io.Reader, error) {
Jan Winkelmann's avatar
Jan Winkelmann committed
188 189 190 191 192
			v, err := unwrapOutput(res.Output())
			if err != nil {
				return nil, err
			}

193
			headers, _, _ := res.Request().Option("headers").Bool()
Jan Winkelmann's avatar
Jan Winkelmann committed
194
			list := v.(*P2PLsOutput)
195 196
			buf := new(bytes.Buffer)
			w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
197
			for _, listener := range list.Listeners {
198
				if headers {
Łukasz Magiera's avatar
Łukasz Magiera committed
199
					fmt.Fprintln(w, "Protocol\tListen Address\tTarget Address")
200 201
				}

Łukasz Magiera's avatar
Łukasz Magiera committed
202
				fmt.Fprintf(w, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress)
203 204 205 206 207 208 209 210
			}
			w.Flush()

			return buf, nil
		},
	},
}

211 212 213 214 215 216 217 218 219 220 221 222 223
var p2pCloseCmd = &cmds.Command{
	Helptext: cmdkit.HelpText{
		Tagline: "Stop listening for new connections to forward.",
	},
	Options: []cmdkit.Option{
		cmdkit.BoolOption("all", "a", "Close all listeners."),
		cmdkit.StringOption("protocol", "p", "Match protocol name"),
		cmdkit.StringOption("listen-address", "l", "Match listen address"),
		cmdkit.StringOption("target-address", "t", "Match target address"),
	},
	Run: func(req cmds.Request, res cmds.Response) {
		res.SetOutput(nil)

224
		n, err := p2pGetNode(req)
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
		if err != nil {
			res.SetError(err, cmdkit.ErrNormal)
			return
		}

		closeAll, _, _ := req.Option("all").Bool()
		proto, p, _ := req.Option("protocol").String()
		listen, l, _ := req.Option("listen-address").String()
		target, t, _ := req.Option("target-address").String()

		if !(closeAll || p || l || t) {
			res.SetError(errors.New("no connection matching options given"), cmdkit.ErrNormal)
			return
		}

		if closeAll && (p || l || t) {
			res.SetError(errors.New("can't combine --all with other matching options"), cmdkit.ErrNormal)
			return
		}

		match := func(listener p2p.Listener) bool {
			out := true
247 248 249 250
			if p || !strings.HasPrefix(proto, "/p2p/") {
				proto = "/p2p/" + proto
			}

251 252 253 254 255 256 257 258 259 260 261 262 263 264
			if p {
				out = out && (proto == listener.Protocol())
			}
			if l {
				out = out && (listen == listener.ListenAddress())
			}
			if t {
				out = out && (target == listener.TargetAddress())
			}

			out = out || closeAll
			return out
		}

265
		var closed int
266 267 268 269 270
		for _, listener := range n.P2P.Listeners.Listeners {
			if !match(listener) {
				continue
			}
			listener.Close()
271
			closed++
272
		}
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
		res.SetOutput(closed)
	},
	Type: int(0),
	Marshalers: cmds.MarshalerMap{
		cmds.Text: func(res cmds.Response) (io.Reader, error) {
			v, err := unwrapOutput(res.Output())
			if err != nil {
				return nil, err
			}

			closed := v.(int)
			buf := new(bytes.Buffer)
			fmt.Fprintf(buf, "Closed %d stream(s)\n", closed)

			return buf, nil
		},
289 290 291
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
///////
// Listener
//

// p2pStreamCmd is the 'ipfs p2p stream' command
var p2pStreamCmd = &cmds.Command{
	Helptext: cmdkit.HelpText{
		Tagline:          "P2P stream management.",
		ShortDescription: "Create and manage p2p streams",
	},

	Subcommands: map[string]*cmds.Command{
		"ls":    p2pStreamLsCmd,
		"close": p2pStreamCloseCmd,
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
309
var p2pStreamLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
310
	Helptext: cmdkit.HelpText{
311
		Tagline: "List active p2p streams.",
312
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
313
	Options: []cmdkit.Option{
314
		cmdkit.BoolOption("headers", "v", "Print table headers (HagndlerID, Protocol, Local, Remote)."),
315 316
	},
	Run: func(req cmds.Request, res cmds.Response) {
317
		n, err := p2pGetNode(req)
318
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
319
			res.SetError(err, cmdkit.ErrNormal)
320 321 322
			return
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
323
		output := &P2PStreamsOutput{}
324

325
		for id, s := range n.P2P.Streams.Streams {
Łukasz Magiera's avatar
Łukasz Magiera committed
326
			output.Streams = append(output.Streams, P2PStreamInfoOutput{
327
				HandlerID: strconv.FormatUint(id, 10),
328

329
				Protocol: s.Protocol,
330

331 332
				OriginAddress: s.OriginAddr.String(),
				TargetAddress: s.TargetAddr.String(),
333 334 335
			})
		}

336 337
		res.SetOutput(output)
	},
Łukasz Magiera's avatar
Łukasz Magiera committed
338
	Type: P2PStreamsOutput{},
339 340
	Marshalers: cmds.MarshalerMap{
		cmds.Text: func(res cmds.Response) (io.Reader, error) {
Jan Winkelmann's avatar
Jan Winkelmann committed
341 342 343 344 345
			v, err := unwrapOutput(res.Output())
			if err != nil {
				return nil, err
			}

346
			headers, _, _ := res.Request().Option("headers").Bool()
Jan Winkelmann's avatar
Jan Winkelmann committed
347
			list := v.(*P2PStreamsOutput)
348 349 350 351
			buf := new(bytes.Buffer)
			w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
			for _, stream := range list.Streams {
				if headers {
352
					fmt.Fprintln(w, "Id\tProtocol\tOrigin\tTarget")
353 354
				}

355
				fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress)
356 357 358 359 360
			}
			w.Flush()

			return buf, nil
		},
361 362 363
	},
}

Łukasz Magiera's avatar
Łukasz Magiera committed
364
var p2pStreamCloseCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
365
	Helptext: cmdkit.HelpText{
366 367
		Tagline: "Close active p2p stream.",
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
368
	Arguments: []cmdkit.Argument{
369
		cmdkit.StringArg("id", false, false, "Stream identifier"),
370
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
371
	Options: []cmdkit.Option{
372
		cmdkit.BoolOption("all", "a", "Close all streams."),
373 374
	},
	Run: func(req cmds.Request, res cmds.Response) {
Jan Winkelmann's avatar
Jan Winkelmann committed
375 376
		res.SetOutput(nil)

377
		n, err := p2pGetNode(req)
378
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
379
			res.SetError(err, cmdkit.ErrNormal)
380 381 382 383
			return
		}

		closeAll, _, _ := req.Option("all").Bool()
384
		var handlerID uint64
385

386 387
		if !closeAll {
			if len(req.Arguments()) == 0 {
388
				res.SetError(errors.New("no id specified"), cmdkit.ErrNormal)
389 390
				return
			}
391

392
			handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
393
			if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
394
				res.SetError(err, cmdkit.ErrNormal)
395
				return
396 397 398
			}
		}

399 400
		for id, stream := range n.P2P.Streams.Streams {
			if !closeAll && handlerID != id {
401
				continue
402
			}
403
			stream.Reset()
404 405
			if !closeAll {
				break
406 407 408 409
			}
		}
	},
}
410

411
func p2pGetNode(req cmds.Request) (*core.IpfsNode, error) {
412 413 414 415 416
	n, err := req.InvocContext().GetNode()
	if err != nil {
		return nil, err
	}

417 418
	config, err := n.Repo.Config()
	if err != nil {
419
		return nil, err
420 421 422
	}

	if !config.Experimental.Libp2pStreamMounting {
423
		return nil, errors.New("libp2p stream mounting not enabled")
424
	}
425 426

	if !n.OnlineMode() {
427
		return nil, ErrNotOnline
428 429 430
	}

	return n, nil
431
}