pubsub.go 8.75 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
keks's avatar
keks committed
6
	"errors"
Jeromy's avatar
Jeromy committed
7
	"fmt"
Jeromy's avatar
Jeromy committed
8
	"io"
9
	"net/http"
10
	"sort"
11 12
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
13

14
	core "github.com/ipfs/go-ipfs/core"
15
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
16
	e "github.com/ipfs/go-ipfs/core/commands/e"
Jeromy's avatar
Jeromy committed
17

18 19
	cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
	blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
20
	pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore"
21
	cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
22
	floodsub "gx/ipfs/QmUK4h113Hh7bR2gPpsMcbUEbbzc7hspocmPi91Bmi69nH/go-libp2p-floodsub"
Steven Allen's avatar
Steven Allen committed
23
	cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
Jeromy's avatar
Jeromy committed
24 25 26
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
27
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
28 29 30 31 32 33 34
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
35 36

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
37 38 39
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
40 41 42 43
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
44 45 46 47
	},
}

var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
48
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
49 50 51 52 53 54
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
55 56

To use, the daemon must be run with '--enable-pubsub-experiment'.
57 58 59 60 61 62 63 64 65 66 67 68
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
69 70
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
71 72
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
73
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
74 75
	Options: []cmdkit.Option{
		cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
76
	},
keks's avatar
keks committed
77
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
78
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
79
		if err != nil {
keks's avatar
keks committed
80
			return err
Jeromy's avatar
Jeromy committed
81 82 83 84
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
85
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
86 87
		}

Jeromy's avatar
Jeromy committed
88
		if n.Floodsub == nil {
keks's avatar
keks committed
89
			return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
Jeromy's avatar
Jeromy committed
90 91
		}

92
		topic := req.Arguments[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
93
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
94
		if err != nil {
keks's avatar
keks committed
95
			return err
Jeromy's avatar
Jeromy committed
96
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
97
		defer sub.Cancel()
98

99
		discover, _ := req.Options["discover"].(bool)
100
		if discover {
101 102
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
103
				err := n.Blocks.AddBlock(blk)
104 105 106 107
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
108

109
				connectToPubSubPeers(req.Context, n, blk.Cid())
110
			}()
111
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
112

113 114 115 116
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
117
		for {
118
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
119
			if err == io.EOF || err == context.Canceled {
keks's avatar
keks committed
120
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
121
			} else if err != nil {
keks's avatar
keks committed
122
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
123 124
			}

keks's avatar
keks committed
125 126 127 128
			err = res.Emit(msg)
			if err != nil {
				return err
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
129
		}
Jeromy's avatar
Jeromy committed
130
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
131
	Encoders: cmds.EncoderMap{
132
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
133 134 135 136 137 138 139
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
140
		}),
141
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
142 143 144 145 146
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
147
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
148 149
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
150
		}),
151
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
152 153 154 155 156 157
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
158

Jan Winkelmann's avatar
Jan Winkelmann committed
159
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
160 161 162
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
163 164 165 166 167
		}),
	},
	Type: floodsub.Message{},
}

168
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
169 170 171
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

172
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
192
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
193
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
194 195 196 197 198 199
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
200 201

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
202 203
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
204 205 206
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
207
	},
keks's avatar
keks committed
208
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
209
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
210
		if err != nil {
keks's avatar
keks committed
211
			return err
Jeromy's avatar
Jeromy committed
212 213 214 215
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
216
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
217 218
		}

Jeromy's avatar
Jeromy committed
219
		if n.Floodsub == nil {
keks's avatar
keks committed
220
			return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
Jeromy's avatar
Jeromy committed
221 222
		}

223
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
224

Jeromy's avatar
Jeromy committed
225
		err = req.ParseBodyArgs()
226
		if err != nil {
keks's avatar
keks committed
227
			return err
Jeromy's avatar
Jeromy committed
228 229
		}

230
		for _, data := range req.Arguments[1:] {
Jeromy's avatar
Jeromy committed
231
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
keks's avatar
keks committed
232
				return err
Jeromy's avatar
Jeromy committed
233 234
			}
		}
keks's avatar
keks committed
235 236

		return nil
Jeromy's avatar
Jeromy committed
237 238
	},
}
Jeromy's avatar
Jeromy committed
239 240

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
241
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
242 243 244 245 246 247 248 249 250 251
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
keks's avatar
keks committed
252
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
253
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
254
		if err != nil {
keks's avatar
keks committed
255
			return err
Jeromy's avatar
Jeromy committed
256 257 258 259
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
260
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
261 262 263
		}

		if n.Floodsub == nil {
keks's avatar
keks committed
264
			return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
Jeromy's avatar
Jeromy committed
265 266
		}

keks's avatar
keks committed
267
		return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
Jeromy's avatar
Jeromy committed
268
	},
269
	Type: stringList{},
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
	},
}

func stringListEncoder(req *cmds.Request, w io.Writer, v interface{}) error {
	list, ok := v.(*stringList)
	if !ok {
		return e.TypeErr(list, v)
	}
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
287
}
Jeromy's avatar
Jeromy committed
288 289

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
290
	Helptext: cmdkit.HelpText{
291
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
292
		ShortDescription: `
293 294 295
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
296 297 298 299 300 301 302

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
303 304
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
305
	},
keks's avatar
keks committed
306
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
307
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
308
		if err != nil {
keks's avatar
keks committed
309
			return err
Jeromy's avatar
Jeromy committed
310 311 312 313
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
314
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
315 316 317
		}

		if n.Floodsub == nil {
keks's avatar
keks committed
318
			return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
Jeromy's avatar
Jeromy committed
319 320
		}

321
		var topic string
322 323
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
324 325
		}

326 327 328 329 330
		peers := n.Floodsub.ListPeers(topic)
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
331
		}
332
		sort.Strings(list.Strings)
keks's avatar
keks committed
333
		return cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
334
	},
335
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
336
	Encoders: cmds.EncoderMap{
337
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
338 339
	},
}