pubsub.go 7.16 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9
	"sort"
Jeromy's avatar
Jeromy committed
10

11
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
12
	e "github.com/ipfs/go-ipfs/core/commands/e"
13
	options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
Jeromy's avatar
Jeromy committed
14

15
	cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
16
	cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
Jeromy's avatar
Jeromy committed
17 18 19
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
20
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
21 22 23 24 25 26 27
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
28 29

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
30 31 32
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
33 34 35 36
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
37 38 39
	},
}

Kejie Zhang's avatar
Kejie Zhang committed
40 41 42 43
const (
	pubsubDiscoverOptionName = "discover"
)

44 45 46 47 48 49 50
type pubsubMessage struct {
	From     []byte   `json:"from,omitempty"`
	Data     []byte   `json:"data,omitempty"`
	Seqno    []byte   `json:"seqno,omitempty"`
	TopicIDs []string `json:"topicIDs,omitempty"`
}

Jeromy's avatar
Jeromy committed
51
var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
52
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
53 54 55 56 57 58
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
59 60

To use, the daemon must be run with '--enable-pubsub-experiment'.
61 62 63 64 65 66 67 68 69 70 71 72
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
73 74
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
75 76
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
77
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
78
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
79
		cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
80
	},
keks's avatar
keks committed
81
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
82
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
83
		if err != nil {
keks's avatar
keks committed
84
			return err
Jeromy's avatar
Jeromy committed
85 86
		}

87
		topic := req.Arguments[0]
Kejie Zhang's avatar
Kejie Zhang committed
88
		discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
89 90

		sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
91 92 93
		if err != nil {
			return err
		}
94
		defer sub.Close()
Jan Winkelmann's avatar
Jan Winkelmann committed
95

96 97 98 99
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
100
		for {
101
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
102
			if err == io.EOF || err == context.Canceled {
keks's avatar
keks committed
103
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
104
			} else if err != nil {
keks's avatar
keks committed
105
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
106 107
			}

108 109 110 111 112 113
			res.Emit(&pubsubMessage{
				Data:     msg.Data(),
				From:     []byte(msg.From()),
				Seqno:    msg.Seq(),
				TopicIDs: msg.Topics(),
			})
Jan Winkelmann's avatar
Jan Winkelmann committed
114
		}
Jeromy's avatar
Jeromy committed
115
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
116
	Encoders: cmds.EncoderMap{
117
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
118
			m, ok := v.(*pubsubMessage)
Jan Winkelmann's avatar
Jan Winkelmann committed
119 120 121 122 123 124
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
125
		}),
126
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
127
			m, ok := v.(*pubsubMessage)
Jan Winkelmann's avatar
Jan Winkelmann committed
128 129 130 131
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
132
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
133 134
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
135
		}),
136
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
137
			m, ok := v.(*pubsubMessage)
Jan Winkelmann's avatar
Jan Winkelmann committed
138 139 140 141 142
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
143

Jan Winkelmann's avatar
Jan Winkelmann committed
144
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
145 146 147
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
148 149
		}),
	},
150
	Type: pubsubMessage{},
151 152
}

Jeromy's avatar
Jeromy committed
153
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
154
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
155 156 157 158 159 160
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
161 162

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
163 164
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
165 166 167
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
168
	},
keks's avatar
keks committed
169
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
170
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
171
		if err != nil {
keks's avatar
keks committed
172
			return err
Jeromy's avatar
Jeromy committed
173 174
		}

175
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
176

Jeromy's avatar
Jeromy committed
177
		err = req.ParseBodyArgs()
178
		if err != nil {
keks's avatar
keks committed
179
			return err
Jeromy's avatar
Jeromy committed
180 181
		}

182
		for _, data := range req.Arguments[1:] {
183
			if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
keks's avatar
keks committed
184
				return err
Jeromy's avatar
Jeromy committed
185 186
			}
		}
keks's avatar
keks committed
187 188

		return nil
Jeromy's avatar
Jeromy committed
189 190
	},
}
Jeromy's avatar
Jeromy committed
191 192

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
193
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
194 195 196 197 198 199 200 201 202 203
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
keks's avatar
keks committed
204
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
205
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
206
		if err != nil {
keks's avatar
keks committed
207
			return err
Jeromy's avatar
Jeromy committed
208 209
		}

210 211 212
		l, err := api.PubSub().Ls(req.Context)
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
213 214
		}

215
		return cmds.EmitOnce(res, stringList{l})
Jeromy's avatar
Jeromy committed
216
	},
217
	Type: stringList{},
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
	},
}

func stringListEncoder(req *cmds.Request, w io.Writer, v interface{}) error {
	list, ok := v.(*stringList)
	if !ok {
		return e.TypeErr(list, v)
	}
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
235
}
Jeromy's avatar
Jeromy committed
236 237

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
238
	Helptext: cmdkit.HelpText{
239
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
240
		ShortDescription: `
241 242 243
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
244 245 246 247 248 249 250

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
251 252
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
253
	},
keks's avatar
keks committed
254
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
255
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
256
		if err != nil {
keks's avatar
keks committed
257
			return err
Jeromy's avatar
Jeromy committed
258 259
		}

260
		var topic string
261 262
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
263 264
		}

265 266 267 268 269
		peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
		if err != nil {
			return err
		}

270 271 272 273
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
274
		}
275
		sort.Strings(list.Strings)
keks's avatar
keks committed
276
		return cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
277
	},
278
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
279
	Encoders: cmds.EncoderMap{
280
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
281 282
	},
}