pubsub.go 9 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9
	"sort"
10 11
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
12

13
	core "github.com/ipfs/go-ipfs/core"
14
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
15
	e "github.com/ipfs/go-ipfs/core/commands/e"
Jeromy's avatar
Jeromy committed
16

17
	cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
18
	cmds "gx/ipfs/QmPTfgFTo9PFr1PvPKyKoeMgBvYPh6cX3aDP7DHKVbnCbi/go-ipfs-cmds"
19
	blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
20
	cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
Steven Allen's avatar
Steven Allen committed
21 22
	floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub"
	pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
23 24 25
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
26
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
34 35

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
36 37 38
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
39 40 41 42
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
43 44 45 46
	},
}

var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
47
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
48 49 50 51 52 53
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
54 55

To use, the daemon must be run with '--enable-pubsub-experiment'.
56 57 58 59 60 61 62 63 64 65 66 67
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
68 69
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
70 71
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
72
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
73 74
	Options: []cmdkit.Option{
		cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
75
	},
Jeromy's avatar
Jeromy committed
76
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
77
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
78
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
79
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
80 81 82 83 84
			return
		}

		// Must be online!
		if !n.OnlineMode() {
85
			res.SetError(ErrNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
86 87 88
			return
		}

Jeromy's avatar
Jeromy committed
89
		if n.Floodsub == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
90
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use"), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
91 92 93
			return
		}

94
		topic := req.Arguments[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
95
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
96
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
97
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
98 99
			return
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
100
		defer sub.Cancel()
101

102
		discover, _ := req.Options["discover"].(bool)
103
		if discover {
104 105
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
106
				err := n.Blocks.AddBlock(blk)
107 108 109 110
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
111

112
				connectToPubSubPeers(req.Context, n, blk.Cid())
113
			}()
114
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
115

116 117 118 119
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
120
		for {
121
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
122 123 124 125 126 127 128 129 130
			if err == io.EOF || err == context.Canceled {
				return
			} else if err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}

			res.Emit(msg)
		}
Jeromy's avatar
Jeromy committed
131
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
132
	Encoders: cmds.EncoderMap{
133
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
134 135 136 137 138 139 140
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
141
		}),
142
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
143 144 145 146 147
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
148
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
149 150
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
151
		}),
152
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
153 154 155 156 157 158
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
159

Jan Winkelmann's avatar
Jan Winkelmann committed
160
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
161 162 163
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
164 165 166 167 168
		}),
	},
	Type: floodsub.Message{},
}

169
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
170 171 172
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

173
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
193
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
194
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
195 196 197 198 199 200
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
201 202

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
203 204
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
205 206 207
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
208
	},
Jeromy's avatar
Jeromy committed
209
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
210
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
211
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
212
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
213 214 215 216 217
			return
		}

		// Must be online!
		if !n.OnlineMode() {
218
			res.SetError(ErrNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
219 220 221
			return
		}

Jeromy's avatar
Jeromy committed
222
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
223
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
224 225 226
			return
		}

227
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
228

Jeromy's avatar
Jeromy committed
229
		err = req.ParseBodyArgs()
230
		if err != nil {
Jeromy's avatar
Jeromy committed
231 232 233 234
			res.SetError(err, cmdkit.ErrNormal)
			return
		}

235
		for _, data := range req.Arguments[1:] {
Jeromy's avatar
Jeromy committed
236
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
237
				res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
238 239 240 241 242
				return
			}
		}
	},
}
Jeromy's avatar
Jeromy committed
243 244

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
245
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
246 247 248 249 250 251 252 253 254 255
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jeromy's avatar
Jeromy committed
256
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
257
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
258
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
259
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
260 261 262 263 264
			return
		}

		// Must be online!
		if !n.OnlineMode() {
265
			res.SetError(ErrNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
266 267 268 269
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
270
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
271 272 273
			return
		}

274
		cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
Jeromy's avatar
Jeromy committed
275
	},
276
	Type: stringList{},
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
	},
}

func stringListEncoder(req *cmds.Request, w io.Writer, v interface{}) error {
	list, ok := v.(*stringList)
	if !ok {
		return e.TypeErr(list, v)
	}
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
294
}
Jeromy's avatar
Jeromy committed
295 296

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
297
	Helptext: cmdkit.HelpText{
298
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
299
		ShortDescription: `
300 301 302
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
303 304 305 306 307 308 309

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
310 311
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
312
	},
Jeromy's avatar
Jeromy committed
313
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
314
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
315
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
316
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
317 318 319 320 321
			return
		}

		// Must be online!
		if !n.OnlineMode() {
322
			res.SetError(ErrNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
323 324 325 326
			return
		}

		if n.Floodsub == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
327
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use"), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
328 329 330
			return
		}

331
		var topic string
332 333
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
334 335
		}

336 337 338 339 340
		peers := n.Floodsub.ListPeers(topic)
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
341
		}
342 343
		sort.Strings(list.Strings)
		cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
344
	},
345
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
346
	Encoders: cmds.EncoderMap{
347
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
348 349
	},
}