pubsub.go 8.18 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8 9
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
10

11
	core "github.com/ipfs/go-ipfs/core"
Jeromy's avatar
Jeromy committed
12

13
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
14
	pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
15
	cmds "gx/ipfs/QmQtQuaQvS5mKJVoCvL5FvrYH7oZPjxsVHf2bKSGgcVmZt/go-ipfs-cmds"
Jan Winkelmann's avatar
Jan Winkelmann committed
16
	blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
Steven Allen's avatar
Steven Allen committed
17 18
	cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit"
	floodsub "gx/ipfs/QmVNv1WV6XxzQV4MBuiLX5729wMazaf8TNzm2Sq6ejyHh7/go-libp2p-floodsub"
Jeromy's avatar
Jeromy committed
19 20 21
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
22
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
23 24 25 26 27 28 29
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
30 31

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
32 33 34
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
35 36 37 38
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
39 40 41 42
	},
}

var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
43
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
44 45 46 47 48 49
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
50 51

To use, the daemon must be run with '--enable-pubsub-experiment'.
52 53 54 55 56 57 58 59 60 61 62 63
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
64 65
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
66 67
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
68
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
69 70
	Options: []cmdkit.Option{
		cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
71
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
72
	Run: func(req cmds.Request, res cmds.ResponseEmitter) {
Jeromy's avatar
Jeromy committed
73 74
		n, err := req.InvocContext().GetNode()
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
75
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
76 77 78 79 80
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
81
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
82 83 84
			return
		}

Jeromy's avatar
Jeromy committed
85
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
86
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
87 88 89
			return
		}

Jeromy's avatar
Jeromy committed
90
		topic := req.Arguments()[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
91
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
92
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
93
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
94 95
			return
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
96
		defer sub.Cancel()
97 98 99

		discover, _, _ := req.Option("discover").Bool()
		if discover {
100 101
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
102
				cid, err := n.Blocks.AddBlock(blk)
103 104 105 106
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
107

108 109
				connectToPubSubPeers(req.Context(), n, cid)
			}()
110
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
111 112 113 114 115 116 117 118 119 120 121 122

		for {
			msg, err := sub.Next(req.Context())
			if err == io.EOF || err == context.Canceled {
				return
			} else if err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}

			res.Emit(msg)
		}
Jeromy's avatar
Jeromy committed
123
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
124 125 126 127 128 129 130 131 132
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(func(req cmds.Request, w io.Writer, v interface{}) error {
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
133
		}),
Jan Winkelmann's avatar
Jan Winkelmann committed
134 135 136 137 138 139
		"ndpayload": cmds.MakeEncoder(func(req cmds.Request, w io.Writer, v interface{}) error {
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
140
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
141 142
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
143
		}),
Jan Winkelmann's avatar
Jan Winkelmann committed
144 145 146 147 148 149 150
		"lenpayload": cmds.MakeEncoder(func(req cmds.Request, w io.Writer, v interface{}) error {
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
151

Jan Winkelmann's avatar
Jan Winkelmann committed
152
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
153 154 155
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
156 157 158 159 160
		}),
	},
	Type: floodsub.Message{},
}

161 162 163 164
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

165
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
185
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
186
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
187 188 189 190 191 192
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
193 194

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
195 196
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
197 198 199
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
200
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
201
	Run: func(req cmds.Request, res cmds.ResponseEmitter) {
Jeromy's avatar
Jeromy committed
202 203
		n, err := req.InvocContext().GetNode()
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
204
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
205 206 207 208 209
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
210
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
211 212 213
			return
		}

Jeromy's avatar
Jeromy committed
214
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
215
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
216 217 218
			return
		}

Jeromy's avatar
Jeromy committed
219 220 221 222
		topic := req.Arguments()[0]

		for _, data := range req.Arguments()[1:] {
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
223
				res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
224 225 226 227 228
				return
			}
		}
	},
}
Jeromy's avatar
Jeromy committed
229 230

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
231
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
232 233 234 235 236 237 238 239 240 241
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
242
	Run: func(req cmds.Request, res cmds.ResponseEmitter) {
Jeromy's avatar
Jeromy committed
243 244
		n, err := req.InvocContext().GetNode()
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
245
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
246 247 248 249 250
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
251
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
252 253 254 255
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
256
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
257 258 259
			return
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
260 261 262
		for _, topic := range n.Floodsub.GetTopics() {
			res.Emit(topic)
		}
Jeromy's avatar
Jeromy committed
263
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
264
	Type: "",
Jeromy's avatar
Jeromy committed
265
}
Jeromy's avatar
Jeromy committed
266 267

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
268
	Helptext: cmdkit.HelpText{
269
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
270
		ShortDescription: `
271 272 273
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
274 275 276 277 278 279 280

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
281 282
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
283
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
284
	Run: func(req cmds.Request, res cmds.ResponseEmitter) {
Jeromy's avatar
Jeromy committed
285 286
		n, err := req.InvocContext().GetNode()
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
287
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
288 289 290 291 292
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
293
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
294 295 296 297
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
298
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
299 300 301
			return
		}

302 303 304 305 306
		var topic string
		if len(req.Arguments()) == 1 {
			topic = req.Arguments()[0]
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
307 308
		for _, peer := range n.Floodsub.ListPeers(topic) {
			res.Emit(peer.Pretty())
Jeromy's avatar
Jeromy committed
309 310
		}
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
311 312 313
	Type: "",
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.Encoders[cmds.TextNewline],
Jeromy's avatar
Jeromy committed
314 315
	},
}