pubsub.go 6.71 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package commands

import (
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"encoding/binary"
Jeromy's avatar
Jeromy committed
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"io"
8
	"net/http"
9
	"sort"
Jeromy's avatar
Jeromy committed
10

11
	cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
Jeromy's avatar
Jeromy committed
12

Jakub Sztandera's avatar
Jakub Sztandera committed
13 14
	cmds "github.com/ipfs/go-ipfs-cmds"
	options "github.com/ipfs/interface-go-ipfs-core/options"
Jeromy's avatar
Jeromy committed
15 16 17
)

var PubsubCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
18
	Helptext: cmds.HelpText{
Jeromy's avatar
Jeromy committed
19 20 21 22 23 24 25
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
26 27

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
28 29 30
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
31 32 33 34
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
35 36 37
	},
}

Kejie Zhang's avatar
Kejie Zhang committed
38 39 40 41
const (
	pubsubDiscoverOptionName = "discover"
)

42 43 44 45 46 47 48
type pubsubMessage struct {
	From     []byte   `json:"from,omitempty"`
	Data     []byte   `json:"data,omitempty"`
	Seqno    []byte   `json:"seqno,omitempty"`
	TopicIDs []string `json:"topicIDs,omitempty"`
}

Jeromy's avatar
Jeromy committed
49
var PubsubSubCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
50
	Helptext: cmds.HelpText{
Jeromy's avatar
Jeromy committed
51 52 53 54 55 56
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
57 58

To use, the daemon must be run with '--enable-pubsub-experiment'.
59 60 61 62 63 64 65 66 67 68 69 70
`,
		LongDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.

This command outputs data in the following encodings:
  * "json"
(Specified by the "--encoding" or "--enc" flag)
Jeromy's avatar
Jeromy committed
71 72
`,
	},
Steven Allen's avatar
Steven Allen committed
73 74
	Arguments: []cmds.Argument{
		cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
Jeromy's avatar
Jeromy committed
75
	},
Steven Allen's avatar
Steven Allen committed
76
	Options: []cmds.Option{
77
		cmds.BoolOption(pubsubDiscoverOptionName, "Deprecated option to instruct pubsub to discovery peers for the topic. Discovery is now built into pubsub."),
78
	},
keks's avatar
keks committed
79
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
80
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
81
		if err != nil {
keks's avatar
keks committed
82
			return err
Jeromy's avatar
Jeromy committed
83 84
		}

85
		topic := req.Arguments[0]
86
		sub, err := api.PubSub().Subscribe(req.Context, topic)
87 88 89
		if err != nil {
			return err
		}
90
		defer sub.Close()
Jan Winkelmann's avatar
Jan Winkelmann committed
91

92 93 94 95
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}

Jan Winkelmann's avatar
Jan Winkelmann committed
96
		for {
97
			msg, err := sub.Next(req.Context)
Jan Winkelmann's avatar
Jan Winkelmann committed
98
			if err == io.EOF || err == context.Canceled {
keks's avatar
keks committed
99
				return nil
Jan Winkelmann's avatar
Jan Winkelmann committed
100
			} else if err != nil {
keks's avatar
keks committed
101
				return err
Jan Winkelmann's avatar
Jan Winkelmann committed
102 103
			}

104
			if err := res.Emit(&pubsubMessage{
105 106 107 108
				Data:     msg.Data(),
				From:     []byte(msg.From()),
				Seqno:    msg.Seq(),
				TopicIDs: msg.Topics(),
109 110 111
			}); err != nil {
				return err
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
112
		}
Jeromy's avatar
Jeromy committed
113
	},
Jan Winkelmann's avatar
Jan Winkelmann committed
114
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
115 116
		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
			_, err := w.Write(psm.Data)
Jan Winkelmann's avatar
Jan Winkelmann committed
117
			return err
Jeromy's avatar
Jeromy committed
118
		}),
Overbool's avatar
Overbool committed
119 120 121
		"ndpayload": cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
			psm.Data = append(psm.Data, '\n')
			_, err := w.Write(psm.Data)
Jan Winkelmann's avatar
Jan Winkelmann committed
122
			return err
Jeromy's avatar
Jeromy committed
123
		}),
Overbool's avatar
Overbool committed
124 125
		"lenpayload": cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, psm *pubsubMessage) error {
			buf := make([]byte, 8, len(psm.Data)+8)
126

Overbool's avatar
Overbool committed
127 128
			n := binary.PutUvarint(buf, uint64(len(psm.Data)))
			buf = append(buf[:n], psm.Data...)
Jan Winkelmann's avatar
Jan Winkelmann committed
129 130
			_, err := w.Write(buf)
			return err
Jeromy's avatar
Jeromy committed
131 132
		}),
	},
133
	Type: pubsubMessage{},
134 135
}

Jeromy's avatar
Jeromy committed
136
var PubsubPubCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
137
	Helptext: cmds.HelpText{
Jeromy's avatar
Jeromy committed
138 139 140 141 142 143
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
144 145

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
146 147
`,
	},
Steven Allen's avatar
Steven Allen committed
148 149 150
	Arguments: []cmds.Argument{
		cmds.StringArg("topic", true, false, "Topic to publish to."),
		cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
Jeromy's avatar
Jeromy committed
151
	},
keks's avatar
keks committed
152
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
153
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
154
		if err != nil {
keks's avatar
keks committed
155
			return err
Jeromy's avatar
Jeromy committed
156 157
		}

158
		topic := req.Arguments[0]
Jeromy's avatar
Jeromy committed
159

Jeromy's avatar
Jeromy committed
160
		err = req.ParseBodyArgs()
161
		if err != nil {
keks's avatar
keks committed
162
			return err
Jeromy's avatar
Jeromy committed
163 164
		}

165
		for _, data := range req.Arguments[1:] {
166
			if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
keks's avatar
keks committed
167
				return err
Jeromy's avatar
Jeromy committed
168 169
			}
		}
keks's avatar
keks committed
170 171

		return nil
Jeromy's avatar
Jeromy committed
172 173
	},
}
Jeromy's avatar
Jeromy committed
174 175

var PubsubLsCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
176
	Helptext: cmds.HelpText{
Jeromy's avatar
Jeromy committed
177 178 179 180 181 182 183 184 185 186
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
keks's avatar
keks committed
187
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
188
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
189
		if err != nil {
keks's avatar
keks committed
190
			return err
Jeromy's avatar
Jeromy committed
191 192
		}

193 194 195
		l, err := api.PubSub().Ls(req.Context)
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
196 197
		}

198
		return cmds.EmitOnce(res, stringList{l})
Jeromy's avatar
Jeromy committed
199
	},
200
	Type: stringList{},
201
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
202
		cmds.Text: cmds.MakeTypedEncoder(stringListEncoder),
203 204 205
	},
}

Overbool's avatar
Overbool committed
206
func stringListEncoder(req *cmds.Request, w io.Writer, list *stringList) error {
207
	for _, str := range list.Strings {
208
		_, err := fmt.Fprintf(w, "%s\n", cmdenv.EscNonPrint(str))
209 210 211 212 213
		if err != nil {
			return err
		}
	}
	return nil
Jeromy's avatar
Jeromy committed
214
}
Jeromy's avatar
Jeromy committed
215 216

var PubsubPeersCmd = &cmds.Command{
Steven Allen's avatar
Steven Allen committed
217
	Helptext: cmds.HelpText{
218
		Tagline: "List peers we are currently pubsubbing with.",
Jeromy's avatar
Jeromy committed
219
		ShortDescription: `
220 221 222
ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected
peers who are subscribed to the named topic.
Jeromy's avatar
Jeromy committed
223 224 225 226 227 228 229

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
Steven Allen's avatar
Steven Allen committed
230 231
	Arguments: []cmds.Argument{
		cmds.StringArg("topic", false, false, "topic to list connected peers of"),
232
	},
keks's avatar
keks committed
233
	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
234
		api, err := cmdenv.GetApi(env, req)
Jeromy's avatar
Jeromy committed
235
		if err != nil {
keks's avatar
keks committed
236
			return err
Jeromy's avatar
Jeromy committed
237 238
		}

239
		var topic string
240 241
		if len(req.Arguments) == 1 {
			topic = req.Arguments[0]
242 243
		}

244 245 246 247 248
		peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
		if err != nil {
			return err
		}

249 250 251 252
		list := &stringList{make([]string, 0, len(peers))}

		for _, peer := range peers {
			list.Strings = append(list.Strings, peer.Pretty())
Jeromy's avatar
Jeromy committed
253
		}
254
		sort.Strings(list.Strings)
keks's avatar
keks committed
255
		return cmds.EmitOnce(res, list)
Jeromy's avatar
Jeromy committed
256
	},
257
	Type: stringList{},
Jan Winkelmann's avatar
Jan Winkelmann committed
258
	Encoders: cmds.EncoderMap{
Overbool's avatar
Overbool committed
259
		cmds.Text: cmds.MakeTypedEncoder(stringListEncoder),
Jeromy's avatar
Jeromy committed
260 261
	},
}