pubsub.go 8.4 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9 10
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
11

12
	core "github.com/ipfs/go-ipfs/core"
Jeromy's avatar
Jeromy committed
13

Steven Allen's avatar
Steven Allen committed
14 15
	floodsub "gx/ipfs/QmSFihvoND3eDaAYRCeLgLPt62yCPgMZs1NSZmKFEtJQQw/go-libp2p-floodsub"
	pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore"
Steven Allen's avatar
Steven Allen committed
16
	cmds "gx/ipfs/QmabLouZTZwhfALuBcssPvkzhbYGMb4394huT7HY4LQ6d3/go-ipfs-cmds"
Steven Allen's avatar
Steven Allen committed
17
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
18
	cmdkit "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit"
Steven Allen's avatar
Steven Allen committed
19
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
Jeromy's avatar
Jeromy committed
20 21 22
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
23
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
24 25 26 27 28 29 30
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
31 32

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
33 34 35
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
36 37 38 39
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
40 41 42 43
	},
}

var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
44
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
45 46 47 48 49 50
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
51 52

To use, the daemon must be run with '--enable-pubsub-experiment'.
53 54 55 56 57 58 59 60 61 62 63 64
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
65 66
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
67 68
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
69
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
70 71
	Options: []cmdkit.Option{
		cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
72
	},
Jeromy's avatar
Jeromy committed
73
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
74
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
75
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
76
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
77 78 79 80 81
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
82
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
83 84 85
			return
		}

Jeromy's avatar
Jeromy committed
86
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
87
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
88 89 90
			return
		}

91
		topic := req.Arguments[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
92
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
93
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
94
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
95 96
			return
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
97
		defer sub.Cancel()
98

99
		discover, _ := req.Options["discover"].(bool)
100
		if discover {
101 102
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
103
				err := n.Blocks.AddBlock(blk)
104 105 106 107
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
108

109
				connectToPubSubPeers(req.Context, n, blk.Cid())
110
			}()
111
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
112

113 114 115 116
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
117
		for {
118
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
119 120 121 122 123 124 125 126 127
			if err == io.EOF || err == context.Canceled {
				return
			} else if err != nil {
				res.SetError(err, cmdkit.ErrNormal)
				return
			}

			res.Emit(msg)
		}
Jeromy's avatar
Jeromy committed
128
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
129
	Encoders: cmds.EncoderMap{
130
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
131 132 133 134 135 136 137
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
138
		}),
139
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
140 141 142 143 144
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
145
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
146 147
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
148
		}),
149
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
150 151 152 153 154 155
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
156

Jan Winkelmann's avatar
Jan Winkelmann committed
157
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
158 159 160
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
161 162 163 164 165
		}),
	},
	Type: floodsub.Message{},
}

166 167 168 169
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

170
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
190
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
191
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
192 193 194 195 196 197
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
198 199

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
200 201
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
202 203 204
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
205
	},
Jeromy's avatar
Jeromy committed
206
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
207
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
208
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
209
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
210 211 212 213 214
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
215
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
216 217 218
			return
		}

Jeromy's avatar
Jeromy committed
219
		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
220
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
221 222 223
			return
		}

224
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
225

Jeromy's avatar
Jeromy committed
226 227 228 229 230 231
		err = req.ParseBodyArgs()
		if err != nil && !cmds.IsAllArgsAlreadyCovered(err) {
			res.SetError(err, cmdkit.ErrNormal)
			return
		}

232
		for _, data := range req.Arguments[1:] {
Jeromy's avatar
Jeromy committed
233
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
234
				res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
235 236 237 238 239
				return
			}
		}
	},
}
Jeromy's avatar
Jeromy committed
240 241

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
242
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
243 244 245 246 247 248 249 250 251 252
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jeromy's avatar
Jeromy committed
253
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
254
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
255
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
256
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
257 258 259 260 261
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
262
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
263 264 265 266
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
267
			res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
268 269 270
			return
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
271 272 273
		for _, topic := range n.Floodsub.GetTopics() {
			res.Emit(topic)
		}
Jeromy's avatar
Jeromy committed
274
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
275
	Type: "",
Jeromy's avatar
Jeromy committed
276
}
Jeromy's avatar
Jeromy committed
277 278

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
279
	Helptext: cmdkit.HelpText{
280
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
281
		ShortDescription: `
282 283 284
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
285 286 287 288 289 290 291

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
292 293
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
294
	},
Jeromy's avatar
Jeromy committed
295
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
296
		n, err := GetNode(env)
Jeromy's avatar
Jeromy committed
297
		if err != nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
298
			res.SetError(err, cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
299 300 301 302 303
			return
		}

		// Must be online!
		if !n.OnlineMode() {
Jan Winkelmann's avatar
Jan Winkelmann committed
304
			res.SetError(errNotOnline, cmdkit.ErrClient)
Jeromy's avatar
Jeromy committed
305 306 307 308
			return
		}

		if n.Floodsub == nil {
Jan Winkelmann's avatar
Jan Winkelmann committed
309
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmdkit.ErrNormal)
Jeromy's avatar
Jeromy committed
310 311 312
			return
		}

313
		var topic string
314 315
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
316 317
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
318 319
		for _, peer := range n.Floodsub.ListPeers(topic) {
			res.Emit(peer.Pretty())
Jeromy's avatar
Jeromy committed
320 321
		}
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
322 323 324
	Type: "",
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.Encoders[cmds.TextNewline],
Jeromy's avatar
Jeromy committed
325 326
	},
}