pubsub.go 8.92 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9
	"sort"
10 11
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
12

13
	core "github.com/ipfs/go-ipfs/core"
14
	e "github.com/ipfs/go-ipfs/core/commands/e"
Jeromy's avatar
Jeromy committed
15

16 17 18 19 20 21
	cmds "gx/ipfs/QmPTfgFTo9PFr1PvPKyKoeMgBvYPh6cX3aDP7DHKVbnCbi/go-ipfs-cmds"
	cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
	floodsub "gx/ipfs/QmVFB6rGJEZnzJrQwoEhbyDs1tA8RVsQvCS6JXpuw9Xtta/go-libp2p-floodsub"
	blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format"
	cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
	pstore "gx/ipfs/QmeKD8YT7887Xu6Z86iZmpYNxrLogJexqxEugSmaf14k64/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
22 23 24
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
25
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
26 27 28 29 30 31 32
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
33 34

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
35 36 37
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
38 39 40 41
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
42 43 44 45
	},
}

var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
46
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
47 48 49 50 51 52
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
53 54

To use, the daemon must be run with '--enable-pubsub-experiment'.
55 56 57 58 59 60 61 62 63 64 65 66
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
67 68
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
69 70
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
71
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
72 73
	Options: []cmdkit.Option{
		cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
74
	},
Jeromy's avatar
Jeromy committed
75
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
76
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
77
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
78
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
79 80 81 82 83
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
84
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
85 86 87
			return
		}

Jeromy's avatar
Jeromy committed
88
		if n.Floodsub == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
89
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use"), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
90 91 92
			return
		}

93
		topic := req.Arguments[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
94
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
95
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
96
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
97 98
			return
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
99
		defer sub.Cancel()
100

101
		discover, _ := req.Options["discover"].(bool)
102
		if discover {
103 104
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
105
				err := n.Blocks.AddBlock(blk)
106 107 108 109
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
110

111
				connectToPubSubPeers(req.Context, n, blk.Cid())
112
			}()
113
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
114

115 116 117 118
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
119
		for {
120
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
121 122 123 124 125 126 127 128 129
			if err == io.EOF || err == context.Canceled {
				return
			} else if err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}

			res.Emit(msg)
		}
Jeromy's avatar
Jeromy committed
130
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
131
	Encoders: cmds.EncoderMap{
132
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
133 134 135 136 137 138 139
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
140
		}),
141
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
142 143 144 145 146
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
147
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
148 149
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
150
		}),
151
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
152 153 154 155 156 157
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
158

Jan Winkelmann's avatar
Jan Winkelmann committed
159
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
160 161 162
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
163 164 165 166 167
		}),
	},
	Type: floodsub.Message{},
}

168 169 170 171
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

172
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
192
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
193
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
194 195 196 197 198 199
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
200 201

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
202 203
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
204 205 206
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
207
	},
Jeromy's avatar
Jeromy committed
208
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
209
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
210
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
211
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
212 213 214 215 216
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
217
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
218 219 220
			return
		}

Jeromy's avatar
Jeromy committed
221
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
222
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
223 224 225
			return
		}

226
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
227

Jeromy's avatar
Jeromy committed
228
		err = req.ParseBodyArgs()
229
		if err != nil {
Jeromy's avatar
Jeromy committed
230 231 232 233
			res.SetError(err, cmdkit.ErrNormal)
			return
		}

234
		for _, data := range req.Arguments[1:] {
Jeromy's avatar
Jeromy committed
235
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
236
				res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
237 238 239 240 241
				return
			}
		}
	},
}
Jeromy's avatar
Jeromy committed
242 243

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
244
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
245 246 247 248 249 250 251 252 253 254
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jeromy's avatar
Jeromy committed
255
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
256
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
257
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
258
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
259 260 261 262 263
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
264
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
265 266 267 268
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
269
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
270 271 272
			return
		}

273
		cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
Jeromy's avatar
Jeromy committed
274
	},
275
	Type: stringList{},
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
	},
}

func stringListEncoder(req *cmds.Request, w io.Writer, v interface{}) error {
	list, ok := v.(*stringList)
	if !ok {
		return e.TypeErr(list, v)
	}
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
293
}
Jeromy's avatar
Jeromy committed
294 295

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
296
	Helptext: cmdkit.HelpText{
297
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
298
		ShortDescription: `
299 300 301
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
302 303 304 305 306 307 308

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
309 310
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
311
	},
Jeromy's avatar
Jeromy committed
312
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
313
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
314
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
315
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
316 317 318 319 320
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
321
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
322 323 324 325
			return
		}

		if n.Floodsub == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
326
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use"), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
327 328 329
			return
		}

330
		var topic string
331 332
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
333 334
		}

335 336 337 338 339
		peers := n.Floodsub.ListPeers(topic)
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
340
		}
341 342
		sort.Strings(list.Strings)
		cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
343
	},
344
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
345
	Encoders: cmds.EncoderMap{
346
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
347 348
	},
}