pubsub.go 6.8 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9
	"sort"
Jeromy's avatar
Jeromy committed
10

11
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
Jeromy's avatar
Jeromy committed
12

Jakub Sztandera's avatar
Jakub Sztandera committed
13 14 15
	cmdkit "github.com/ipfs/go-ipfs-cmdkit"
	cmds "github.com/ipfs/go-ipfs-cmds"
	options "github.com/ipfs/interface-go-ipfs-core/options"
Jeromy's avatar
Jeromy committed
16 17 18
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
19
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
20 21 22 23 24 25 26
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
27 28

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
29 30 31
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
32 33 34 35
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
36 37 38
	},
}

Kejie Zhang's avatar
Kejie Zhang committed
39 40 41 42
const (
	pubsubDiscoverOptionName = "discover"
)

43 44 45 46 47 48 49
type pubsubMessage struct {
	From     []byte   `json:"from,omitempty"`
	Data     []byte   `json:"data,omitempty"`
	Seqno    []byte   `json:"seqno,omitempty"`
	TopicIDs []string `json:"topicIDs,omitempty"`
}

Jeromy's avatar
Jeromy committed
50
var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
51
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
52 53 54 55 56 57
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
58 59

To use, the daemon must be run with '--enable-pubsub-experiment'.
60 61 62 63 64 65 66 67 68 69 70 71
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
72 73
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
74 75
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
76
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
77
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
78
		cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
79
	},
keks's avatar
keks committed
80
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
81
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
82
		if err != nil {
keks's avatar
keks committed
83
			return err
Jeromy's avatar
Jeromy committed
84 85
		}

86
		topic := req.Arguments[0]
Kejie Zhang's avatar
Kejie Zhang committed
87
		discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
88 89

		sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
90 91 92
		if err != nil {
			return err
		}
93
		defer sub.Close()
Jan Winkelmann's avatar
Jan Winkelmann committed
94

95 96 97 98
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
99
		for {
100
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
101
			if err == io.EOF || err == context.Canceled {
keks's avatar
keks committed
102
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
103
			} else if err != nil {
keks's avatar
keks committed
104
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
105 106
			}

107
			if err := res.Emit(&pubsubMessage{
108 109 110 111
				Data:     msg.Data(),
				From:     []byte(msg.From()),
				Seqno:    msg.Seq(),
				TopicIDs: msg.Topics(),
112 113 114
			}); err != nil {
				return err
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
115
		}
Jeromy's avatar
Jeromy committed
116
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
117
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
118 119
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
			_, err := w.Write(psm.Data)
Jan Winkelmann's avatar
Jan Winkelmann committed
120
			return err
Jeromy's avatar
Jeromy committed
121
		}),
Overbool's avatar
Overbool committed
122 123 124
		"ndpayload": cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
			psm.Data = append(psm.Data, '\n')
			_, err := w.Write(psm.Data)
Jan Winkelmann's avatar
Jan Winkelmann committed
125
			return err
Jeromy's avatar
Jeromy committed
126
		}),
Overbool's avatar
Overbool committed
127 128
		"lenpayload": cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
			buf := make([]byte, 8, len(psm.Data)+8)
129

Overbool's avatar
Overbool committed
130 131
			n := binary.PutUvarint(buf, uint64(len(psm.Data)))
			buf = append(buf[:n], psm.Data...)
Jan Winkelmann's avatar
Jan Winkelmann committed
132 133
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
134 135
		}),
	},
136
	Type: pubsubMessage{},
137 138
}

Jeromy's avatar
Jeromy committed
139
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
140
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
141 142 143 144 145 146
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
147 148

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
149 150
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
151 152 153
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
154
	},
keks's avatar
keks committed
155
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
156
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
157
		if err != nil {
keks's avatar
keks committed
158
			return err
Jeromy's avatar
Jeromy committed
159 160
		}

161
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
162

Jeromy's avatar
Jeromy committed
163
		err = req.ParseBodyArgs()
164
		if err != nil {
keks's avatar
keks committed
165
			return err
Jeromy's avatar
Jeromy committed
166 167
		}

168
		for _, data := range req.Arguments[1:] {
169
			if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
keks's avatar
keks committed
170
				return err
Jeromy's avatar
Jeromy committed
171 172
			}
		}
keks's avatar
keks committed
173 174

		return nil
Jeromy's avatar
Jeromy committed
175 176
	},
}
Jeromy's avatar
Jeromy committed
177 178

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
179
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
180 181 182 183 184 185 186 187 188 189
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
keks's avatar
keks committed
190
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
191
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
192
		if err != nil {
keks's avatar
keks committed
193
			return err
Jeromy's avatar
Jeromy committed
194 195
		}

196 197 198
		l, err := api.PubSub().Ls(req.Context)
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
199 200
		}

201
		return cmds.EmitOnce(res, stringList{l})
Jeromy's avatar
Jeromy committed
202
	},
203
	Type: stringList{},
204
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
205
		cmds.Text: cmds.MakeTypedEncoder(stringListEncoder),
206 207 208
	},
}

Overbool's avatar
Overbool committed
209
func stringListEncoder(req *cmds.Request, w io.Writer, list *stringList) error {
210 211 212 213 214 215 216
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
217
}
Jeromy's avatar
Jeromy committed
218 219

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
220
	Helptext: cmdkit.HelpText{
221
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
222
		ShortDescription: `
223 224 225
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
226 227 228 229 230 231 232

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
233 234
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
235
	},
keks's avatar
keks committed
236
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
237
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
238
		if err != nil {
keks's avatar
keks committed
239
			return err
Jeromy's avatar
Jeromy committed
240 241
		}

242
		var topic string
243 244
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
245 246
		}

247 248 249 250 251
		peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
		if err != nil {
			return err
		}

252 253 254 255
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
256
		}
257
		sort.Strings(list.Strings)
keks's avatar
keks committed
258
		return cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
259
	},
260
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
261
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
262
		cmds.Text: cmds.MakeTypedEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
263 264
	},
}