pubsub.go 8.82 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
keks's avatar
keks committed
6
	"errors"
Jeromy's avatar
Jeromy committed
7
	"fmt"
Jeromy's avatar
Jeromy committed
8
	"io"
9
	"net/http"
10
	"sort"
11 12
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
13

14
	core "github.com/ipfs/go-ipfs/core"
15
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
16
	e "github.com/ipfs/go-ipfs/core/commands/e"
Jeromy's avatar
Jeromy committed
17

18 19
	cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
	blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
20
	pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore"
21
	cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
22
	floodsub "gx/ipfs/QmUK4h113Hh7bR2gPpsMcbUEbbzc7hspocmPi91Bmi69nH/go-libp2p-floodsub"
Steven Allen's avatar
Steven Allen committed
23
	cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
Jeromy's avatar
Jeromy committed
24 25 26
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
27
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
28 29 30 31 32 33 34
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
35 36

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
37 38 39
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
40 41 42 43
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
44 45 46
	},
}

Kejie Zhang's avatar
Kejie Zhang committed
47 48 49 50
const (
	pubsubDiscoverOptionName = "discover"
)

Jeromy's avatar
Jeromy committed
51
var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
52
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
53 54 55 56 57 58
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
59 60

To use, the daemon must be run with '--enable-pubsub-experiment'.
61 62 63 64 65 66 67 68 69 70 71 72
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
73 74
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
75 76
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
77
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
78
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
79
		cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
80
	},
keks's avatar
keks committed
81
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
82
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
83
		if err != nil {
keks's avatar
keks committed
84
			return err
Jeromy's avatar
Jeromy committed
85 86 87 88
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
89
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
90 91
		}

Jeromy's avatar
Jeromy committed
92
		if n.Floodsub == nil {
keks's avatar
keks committed
93
			return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
Jeromy's avatar
Jeromy committed
94 95
		}

96
		topic := req.Arguments[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
97
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
98
		if err != nil {
keks's avatar
keks committed
99
			return err
Jeromy's avatar
Jeromy committed
100
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
101
		defer sub.Cancel()
102

Kejie Zhang's avatar
Kejie Zhang committed
103
		discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
104
		if discover {
105 106
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
107
				err := n.Blocks.AddBlock(blk)
108 109 110 111
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
112

113
				connectToPubSubPeers(req.Context, n, blk.Cid())
114
			}()
115
		}
Jan Winkelmann's avatar
Jan Winkelmann committed
116

117 118 119 120
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
121
		for {
122
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
123
			if err == io.EOF || err == context.Canceled {
keks's avatar
keks committed
124
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
125
			} else if err != nil {
keks's avatar
keks committed
126
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
127 128
			}

keks's avatar
keks committed
129 130 131 132
			err = res.Emit(msg)
			if err != nil {
				return err
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
133
		}
Jeromy's avatar
Jeromy committed
134
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
135
	Encoders: cmds.EncoderMap{
136
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
137 138 139 140 141 142 143
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
144
		}),
145
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
146 147 148 149 150
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
151
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
152 153
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
154
		}),
155
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Jan Winkelmann's avatar
Jan Winkelmann committed
156 157 158 159 160 161
			m, ok := v.(*floodsub.Message)
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
162

Jan Winkelmann's avatar
Jan Winkelmann committed
163
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
164 165 166
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
167 168 169 170 171
		}),
	},
	Type: floodsub.Message{},
}

172
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
173 174 175
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

176
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
196
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
197
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
198 199 200 201 202 203
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
204 205

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
206 207
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
208 209 210
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
211
	},
keks's avatar
keks committed
212
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
213
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
214
		if err != nil {
keks's avatar
keks committed
215
			return err
Jeromy's avatar
Jeromy committed
216 217 218 219
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
220
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
221 222
		}

Jeromy's avatar
Jeromy committed
223
		if n.Floodsub == nil {
keks's avatar
keks committed
224
			return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
Jeromy's avatar
Jeromy committed
225 226
		}

227
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
228

Jeromy's avatar
Jeromy committed
229
		err = req.ParseBodyArgs()
230
		if err != nil {
keks's avatar
keks committed
231
			return err
Jeromy's avatar
Jeromy committed
232 233
		}

234
		for _, data := range req.Arguments[1:] {
Jeromy's avatar
Jeromy committed
235
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
keks's avatar
keks committed
236
				return err
Jeromy's avatar
Jeromy committed
237 238
			}
		}
keks's avatar
keks committed
239 240

		return nil
Jeromy's avatar
Jeromy committed
241 242
	},
}
Jeromy's avatar
Jeromy committed
243 244

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
245
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
246 247 248 249 250 251 252 253 254 255
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
keks's avatar
keks committed
256
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
257
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
258
		if err != nil {
keks's avatar
keks committed
259
			return err
Jeromy's avatar
Jeromy committed
260 261 262 263
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
264
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
265 266 267
		}

		if n.Floodsub == nil {
keks's avatar
keks committed
268
			return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
Jeromy's avatar
Jeromy committed
269 270
		}

keks's avatar
keks committed
271
		return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
Jeromy's avatar
Jeromy committed
272
	},
273
	Type: stringList{},
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
	},
}

func stringListEncoder(req *cmds.Request, w io.Writer, v interface{}) error {
	list, ok := v.(*stringList)
	if !ok {
		return e.TypeErr(list, v)
	}
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
291
}
Jeromy's avatar
Jeromy committed
292 293

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
294
	Helptext: cmdkit.HelpText{
295
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
296
		ShortDescription: `
297 298 299
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
300 301 302 303 304 305 306

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
307 308
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
309
	},
keks's avatar
keks committed
310
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
311
		n, err := cmdenv.GetNode(env)
Jeromy's avatar
Jeromy committed
312
		if err != nil {
keks's avatar
keks committed
313
			return err
Jeromy's avatar
Jeromy committed
314 315 316 317
		}

		// Must be online!
		if !n.OnlineMode() {
keks's avatar
keks committed
318
			return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
Jeromy's avatar
Jeromy committed
319 320 321
		}

		if n.Floodsub == nil {
keks's avatar
keks committed
322
			return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
Jeromy's avatar
Jeromy committed
323 324
		}

325
		var topic string
326 327
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
328 329
		}

330 331 332 333 334
		peers := n.Floodsub.ListPeers(topic)
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
335
		}
336
		sort.Strings(list.Strings)
keks's avatar
keks committed
337
		return cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
338
	},
339
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
340
	Encoders: cmds.EncoderMap{
341
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
342 343
	},
}