pubsub.go 8.27 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9 10
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
11

12
	core "github.com/ipfs/go-ipfs/core"
Jeromy's avatar
Jeromy committed
13

Steven Allen's avatar
Steven Allen committed
14 15
	floodsub "gx/ipfs/QmSFihvoND3eDaAYRCeLgLPt62yCPgMZs1NSZmKFEtJQQw/go-libp2p-floodsub"
	pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
16
	cmds "gx/ipfs/QmabLouZTZwhfALuBcssPvkzhbYGMb4394huT7HY4LQ6d3/go-ipfs-cmds"
Steven Allen's avatar
Steven Allen committed
17
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
18
	cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
Steven Allen's avatar
Steven Allen committed
19
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
Jeromy's avatar
Jeromy committed
20 21 22
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
23
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
24 25 26 27 28 29 30
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
31 32

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
33 34 35
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
36 37 38 39
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
40 41 42 43
	},
}

var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
44
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
45 46 47 48 49 50
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
51 52

To use, the daemon must be run with '--enable-pubsub-experiment'.
53 54 55 56 57 58 59 60 61 62 63 64
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
65 66
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
67 68
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
69
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
70 71
	Options: []cmdkit.Option{
		cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
72
	},
Jeromy's avatar
Jeromy committed
73
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
74
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
75
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
76
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
77 78 79 80 81
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
82
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
83 84 85
			return
		}

Jeromy's avatar
Jeromy committed
86
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
87
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
88 89 90
			return
		}

91
		topic := req.Arguments[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
92
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
93
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
94
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
95 96
			return
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
97
		defer sub.Cancel()
98

99
		discover, _ := req.Options["discover"].(bool)
100
		if discover {
101 102
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
103
				err := n.Blocks.AddBlock(blk)
104 105 106 107
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
108

109
				connectToPubSubPeers(req.Context, n, blk.Cid())
110
			}()
111
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
112

113 114 115 116
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
117
		for {
118
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
119 120 121 122 123 124 125 126 127
			if err == io.EOF || err == context.Canceled {
				return
			} else if err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}

			res.Emit(msg)
		}
Jeromy's avatar
Jeromy committed
128
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
129
	Encoders: cmds.EncoderMap{
130
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
131 132 133 134 135 136 137
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
138
		}),
139
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
140 141 142 143 144
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
145
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
146 147
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
148
		}),
149
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
150 151 152 153 154 155
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
156

Jan Winkelmann's avatar
Jan Winkelmann committed
157
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
158 159 160
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
161 162 163 164 165
		}),
	},
	Type: floodsub.Message{},
}

166 167 168 169
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

170
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
190
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
191
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
192 193 194 195 196 197
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
198 199

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
200 201
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
202 203 204
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
205
	},
Jeromy's avatar
Jeromy committed
206
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
207
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
208
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
209
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
210 211 212 213 214
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
215
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
216 217 218
			return
		}

Jeromy's avatar
Jeromy committed
219
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
220
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
221 222 223
			return
		}

224
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
225

226
		for _, data := range req.Arguments[1:] {
Jeromy's avatar
Jeromy committed
227
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
228
				res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
229 230 231 232 233
				return
			}
		}
	},
}
Jeromy's avatar
Jeromy committed
234 235

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
236
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
237 238 239 240 241 242 243 244 245 246
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jeromy's avatar
Jeromy committed
247
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
248
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
249
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
250
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
251 252 253 254 255
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
256
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
257 258 259 260
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
261
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
262 263 264
			return
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
265 266 267
		for _, topic := range n.Floodsub.GetTopics() {
			res.Emit(topic)
		}
Jeromy's avatar
Jeromy committed
268
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
269
	Type: "",
Jeromy's avatar
Jeromy committed
270
}
Jeromy's avatar
Jeromy committed
271 272

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
273
	Helptext: cmdkit.HelpText{
274
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
275
		ShortDescription: `
276 277 278
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
279 280 281 282 283 284 285

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
286 287
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
288
	},
Jeromy's avatar
Jeromy committed
289
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
290
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
291
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
292
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
293 294 295 296 297
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
298
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
299 300 301 302
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
303
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
304 305 306
			return
		}

307
		var topic string
308 309
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
310 311
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
312 313
		for _, peer := range n.Floodsub.ListPeers(topic) {
			res.Emit(peer.Pretty())
Jeromy's avatar
Jeromy committed
314 315
		}
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
316 317 318
	Type: "",
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.Encoders[cmds.TextNewline],
Jeromy's avatar
Jeromy committed
319 320
	},
}