pubsub.go 7.13 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9
	"sort"
Jeromy's avatar
Jeromy committed
10

11
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
12
	e "github.com/ipfs/go-ipfs/core/commands/e"
13
	options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
Jeromy's avatar
Jeromy committed
14

15
	cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
Steven Allen's avatar
Steven Allen committed
16
	cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
Jeromy's avatar
Jeromy committed
17 18 19
)

var PubsubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
20
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
21 22 23 24 25 26 27
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
28 29

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
30 31 32
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
33 34 35 36
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
37 38 39
	},
}

Kejie Zhang's avatar
Kejie Zhang committed
40 41 42 43
const (
	pubsubDiscoverOptionName = "discover"
)

44 45 46 47 48 49 50
type pubsubMessage struct {
	From     []byte   `json:"from,omitempty"`
	Data     []byte   `json:"data,omitempty"`
	Seqno    []byte   `json:"seqno,omitempty"`
	TopicIDs []string `json:"topicIDs,omitempty"`
}

Jeromy's avatar
Jeromy committed
51
var PubsubSubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
52
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
53 54 55 56 57 58
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
59 60

To use, the daemon must be run with '--enable-pubsub-experiment'.
61 62 63 64 65 66 67 68 69 70 71 72
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
73 74
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
75 76
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
77
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
78
	Options: []cmdkit.Option{
Kejie Zhang's avatar
Kejie Zhang committed
79
		cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
80
	},
keks's avatar
keks committed
81
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
82
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
83
		if err != nil {
keks's avatar
keks committed
84
			return err
Jeromy's avatar
Jeromy committed
85 86
		}

87
		topic := req.Arguments[0]
Kejie Zhang's avatar
Kejie Zhang committed
88
		discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
89 90 91

		sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
		defer sub.Close()
Jan Winkelmann's avatar
Jan Winkelmann committed
92

93 94 95 96
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
97
		for {
98
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
99
			if err == io.EOF || err == context.Canceled {
keks's avatar
keks committed
100
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
101
			} else if err != nil {
keks's avatar
keks committed
102
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
103 104
			}

105 106 107 108 109 110
			res.Emit(&pubsubMessage{
				Data:     msg.Data(),
				From:     []byte(msg.From()),
				Seqno:    msg.Seq(),
				TopicIDs: msg.Topics(),
			})
Jan Winkelmann's avatar
Jan Winkelmann committed
111
		}
Jeromy's avatar
Jeromy committed
112
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
113
	Encoders: cmds.EncoderMap{
114
		cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
115
			m, ok := v.(*pubsubMessage)
Jan Winkelmann's avatar
Jan Winkelmann committed
116 117 118 119 120 121
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
122
		}),
123
		"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
124
			m, ok := v.(*pubsubMessage)
Jan Winkelmann's avatar
Jan Winkelmann committed
125 126 127 128
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

Jeromy's avatar
Jeromy committed
129
			m.Data = append(m.Data, '\n')
Jan Winkelmann's avatar
Jan Winkelmann committed
130 131
			_, err := w.Write(m.Data)
			return err
Jeromy's avatar
Jeromy committed
132
		}),
133
		"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
134
			m, ok := v.(*pubsubMessage)
Jan Winkelmann's avatar
Jan Winkelmann committed
135 136 137 138 139
			if !ok {
				return fmt.Errorf("unexpected type: %T", v)
			}

			buf := make([]byte, 8, len(m.Data)+8)
140

Jan Winkelmann's avatar
Jan Winkelmann committed
141
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
Jan Winkelmann's avatar
Jan Winkelmann committed
142 143 144
			buf = append(buf[:n], m.Data...)
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
145 146
		}),
	},
147
	Type: pubsubMessage{},
148 149
}

Jeromy's avatar
Jeromy committed
150
var PubsubPubCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
151
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
152 153 154 155 156 157
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
158 159

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
160 161
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
162 163 164
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", true, false, "Topic to publish to."),
		cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
165
	},
keks's avatar
keks committed
166
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
167
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
168
		if err != nil {
keks's avatar
keks committed
169
			return err
Jeromy's avatar
Jeromy committed
170 171
		}

172
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
173

Jeromy's avatar
Jeromy committed
174
		err = req.ParseBodyArgs()
175
		if err != nil {
keks's avatar
keks committed
176
			return err
Jeromy's avatar
Jeromy committed
177 178
		}

179
		for _, data := range req.Arguments[1:] {
180
			if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
keks's avatar
keks committed
181
				return err
Jeromy's avatar
Jeromy committed
182 183
			}
		}
keks's avatar
keks committed
184 185

		return nil
Jeromy's avatar
Jeromy committed
186 187
	},
}
Jeromy's avatar
Jeromy committed
188 189

var PubsubLsCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
190
	Helptext: cmdkit.HelpText{
Jeromy's avatar
Jeromy committed
191 192 193 194 195 196 197 198 199 200
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
keks's avatar
keks committed
201
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
202
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
203
		if err != nil {
keks's avatar
keks committed
204
			return err
Jeromy's avatar
Jeromy committed
205 206
		}

207 208 209
		l, err := api.PubSub().Ls(req.Context)
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
210 211
		}

212
		return cmds.EmitOnce(res, stringList{l})
Jeromy's avatar
Jeromy committed
213
	},
214
	Type: stringList{},
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
	Encoders: cmds.EncoderMap{
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
	},
}

func stringListEncoder(req *cmds.Request, w io.Writer, v interface{}) error {
	list, ok := v.(*stringList)
	if !ok {
		return e.TypeErr(list, v)
	}
	for _, str := range list.Strings {
		_, err := fmt.Fprintf(w, "%s\n", str)
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
232
}
Jeromy's avatar
Jeromy committed
233 234

var PubsubPeersCmd = &cmds.Command{
Jan Winkelmann's avatar
Jan Winkelmann committed
235
	Helptext: cmdkit.HelpText{
236
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
237
		ShortDescription: `
238 239 240
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
241 242 243 244 245 246 247

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
248 249
	Arguments: []cmdkit.Argument{
		cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
250
	},
keks's avatar
keks committed
251
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
252
		api, err := cmdenv.GetApi(env)
Jeromy's avatar
Jeromy committed
253
		if err != nil {
keks's avatar
keks committed
254
			return err
Jeromy's avatar
Jeromy committed
255 256
		}

257
		var topic string
258 259
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
260 261
		}

262 263 264 265 266
		peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
		if err != nil {
			return err
		}

267 268 269 270
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
271
		}
272
		sort.Strings(list.Strings)
keks's avatar
keks committed
273
		return cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
274
	},
275
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
276
	Encoders: cmds.EncoderMap{
277
		cmds.Text: cmds.MakeEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
278 279
	},
}