pubsub.go 7.91 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4
package commands

import (
	"bytes"
5
	"context"
Jeromy's avatar
Jeromy committed
6
	"encoding/binary"
Jeromy's avatar
Jeromy committed
7
	"fmt"
Jeromy's avatar
Jeromy committed
8
	"io"
9
	"strings"
10 11
	"sync"
	"time"
Jeromy's avatar
Jeromy committed
12

13
	blocks "github.com/ipfs/go-ipfs/blocks"
Jeromy's avatar
Jeromy committed
14
	cmds "github.com/ipfs/go-ipfs/commands"
15
	core "github.com/ipfs/go-ipfs/core"
Jeromy's avatar
Jeromy committed
16

Jeromy's avatar
Jeromy committed
17
	floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub"
18
	u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
Jeromy's avatar
Jeromy committed
19 20
	cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
	pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
21 22 23 24 25 26 27 28 29 30 31
)

var PubsubCmd = &cmds.Command{
	Helptext: cmds.HelpText{
		Tagline: "An experimental publish-subscribe system on ipfs.",
		ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
32 33

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
34 35 36
`,
	},
	Subcommands: map[string]*cmds.Command{
Jeromy's avatar
Jeromy committed
37 38 39 40
		"pub":   PubsubPubCmd,
		"sub":   PubsubSubCmd,
		"ls":    PubsubLsCmd,
		"peers": PubsubPeersCmd,
Jeromy's avatar
Jeromy committed
41 42 43 44 45 46 47 48 49 50 51
	},
}

var PubsubSubCmd = &cmds.Command{
	Helptext: cmds.HelpText{
		Tagline: "Subscribe to messages on a given topic.",
		ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
52 53

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
54 55 56 57 58
`,
	},
	Arguments: []cmds.Argument{
		cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
	},
59 60 61
	Options: []cmds.Option{
		cmds.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
	},
Jeromy's avatar
Jeromy committed
62 63 64 65 66 67 68 69 70 71 72 73 74
	Run: func(req cmds.Request, res cmds.Response) {
		n, err := req.InvocContext().GetNode()
		if err != nil {
			res.SetError(err, cmds.ErrNormal)
			return
		}

		// Must be online!
		if !n.OnlineMode() {
			res.SetError(errNotOnline, cmds.ErrClient)
			return
		}

Jeromy's avatar
Jeromy committed
75 76 77 78 79
		if n.Floodsub == nil {
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
			return
		}

Jeromy's avatar
Jeromy committed
80
		topic := req.Arguments()[0]
Jan Winkelmann's avatar
Jan Winkelmann committed
81
		sub, err := n.Floodsub.Subscribe(topic)
Jeromy's avatar
Jeromy committed
82 83 84 85 86 87 88 89 90
		if err != nil {
			res.SetError(err, cmds.ErrNormal)
			return
		}

		out := make(chan interface{})
		res.SetOutput((<-chan interface{})(out))

		go func() {
Jan Winkelmann's avatar
Jan Winkelmann committed
91
			defer sub.Cancel()
Jeromy's avatar
Jeromy committed
92
			defer close(out)
93 94 95

			out <- floodsub.Message{}

Jeromy's avatar
Jeromy committed
96
			for {
Jan Winkelmann's avatar
Jan Winkelmann committed
97 98
				msg, err := sub.Next(req.Context())
				if err == io.EOF || err == context.Canceled {
99
					return
Jan Winkelmann's avatar
Jan Winkelmann committed
100 101 102
				} else if err != nil {
					res.SetError(err, cmds.ErrNormal)
					return
Jeromy's avatar
Jeromy committed
103
				}
Jan Winkelmann's avatar
Jan Winkelmann committed
104 105

				out <- msg
Jeromy's avatar
Jeromy committed
106 107
			}
		}()
108 109 110

		discover, _, _ := req.Option("discover").Bool()
		if discover {
111 112
			go func() {
				blk := blocks.NewBlock([]byte("floodsub:" + topic))
113
				cid, err := n.Blocks.AddBlock(blk)
114 115 116 117
				if err != nil {
					log.Error("pubsub discovery: ", err)
					return
				}
118

119 120
				connectToPubSubPeers(req.Context(), n, cid)
			}()
121
		}
Jeromy's avatar
Jeromy committed
122 123 124 125 126 127 128 129 130 131 132
	},
	Marshalers: cmds.MarshalerMap{
		cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
			return bytes.NewReader(m.Data), nil
		}),
		"ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
			m.Data = append(m.Data, '\n')
			return bytes.NewReader(m.Data), nil
		}),
		"lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
			buf := make([]byte, 8)
133

Jan Winkelmann's avatar
Jan Winkelmann committed
134 135
			n := binary.PutUvarint(buf, uint64(len(m.Data)))
			return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil
Jeromy's avatar
Jeromy committed
136 137 138 139 140
		}),
	},
	Type: floodsub.Message{},
}

141 142 143 144
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

145
	provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
	wg := &sync.WaitGroup{}
	for p := range provs {
		wg.Add(1)
		go func(pi pstore.PeerInfo) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(ctx, time.Second*10)
			defer cancel()
			err := n.PeerHost.Connect(ctx, pi)
			if err != nil {
				log.Info("pubsub discover: ", err)
				return
			}
			log.Info("connected to pubsub peer:", pi.ID)
		}(p)
	}

	wg.Wait()
}

Jeromy's avatar
Jeromy committed
165 166 167 168 169 170 171 172 173 174 175 176
func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) {
	return func(res cmds.Response) (io.Reader, error) {
		outChan, ok := res.Output().(<-chan interface{})
		if !ok {
			return nil, u.ErrCast()
		}

		marshal := func(v interface{}) (io.Reader, error) {
			obj, ok := v.(*floodsub.Message)
			if !ok {
				return nil, u.ErrCast()
			}
Jan Winkelmann's avatar
Jan Winkelmann committed
177 178 179
			if obj.Message == nil {
				return strings.NewReader(""), nil
			}
Jeromy's avatar
Jeromy committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

			return f(obj)
		}

		return &cmds.ChannelMarshaler{
			Channel:   outChan,
			Marshaler: marshal,
			Res:       res,
		}, nil
	}
}

var PubsubPubCmd = &cmds.Command{
	Helptext: cmds.HelpText{
		Tagline: "Publish a message to a given pubsub topic.",
		ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.
Jeromy's avatar
Jeromy committed
200 201

To use, the daemon must be run with '--enable-pubsub-experiment'.
Jeromy's avatar
Jeromy committed
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
`,
	},
	Arguments: []cmds.Argument{
		cmds.StringArg("topic", true, false, "Topic to publish to."),
		cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
	},
	Run: func(req cmds.Request, res cmds.Response) {
		n, err := req.InvocContext().GetNode()
		if err != nil {
			res.SetError(err, cmds.ErrNormal)
			return
		}

		// Must be online!
		if !n.OnlineMode() {
			res.SetError(errNotOnline, cmds.ErrClient)
			return
		}

Jeromy's avatar
Jeromy committed
221 222 223 224 225
		if n.Floodsub == nil {
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
			return
		}

Jeromy's avatar
Jeromy committed
226 227 228 229 230 231 232 233 234 235
		topic := req.Arguments()[0]

		for _, data := range req.Arguments()[1:] {
			if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
				res.SetError(err, cmds.ErrNormal)
				return
			}
		}
	},
}
Jeromy's avatar
Jeromy committed
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

var PubsubLsCmd = &cmds.Command{
	Helptext: cmds.HelpText{
		Tagline: "List subscribed topics by name.",
		ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
	Run: func(req cmds.Request, res cmds.Response) {
		n, err := req.InvocContext().GetNode()
		if err != nil {
			res.SetError(err, cmds.ErrNormal)
			return
		}

		// Must be online!
		if !n.OnlineMode() {
			res.SetError(errNotOnline, cmds.ErrClient)
			return
		}

		if n.Floodsub == nil {
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
			return
		}

		res.SetOutput(&stringList{n.Floodsub.GetTopics()})
	},
	Type: stringList{},
	Marshalers: cmds.MarshalerMap{
		cmds.Text: stringListMarshaler,
	},
}
Jeromy's avatar
Jeromy committed
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305

var PubsubPeersCmd = &cmds.Command{
	Helptext: cmds.HelpText{
		Tagline: "List all peers we are currently pubsubbing with.",
		ShortDescription: `
ipfs pubsub peers lists out the pubsub peers you are currently connected to.

This is an experimental feature. It is not intended in its current state
to be used in a production environment.

To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
	},
	Run: func(req cmds.Request, res cmds.Response) {
		n, err := req.InvocContext().GetNode()
		if err != nil {
			res.SetError(err, cmds.ErrNormal)
			return
		}

		// Must be online!
		if !n.OnlineMode() {
			res.SetError(errNotOnline, cmds.ErrClient)
			return
		}

		if n.Floodsub == nil {
			res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
			return
		}

		var out []string
306
		for _, p := range n.Floodsub.ListPeers("") {
Jeromy's avatar
Jeromy committed
307 308 309 310 311 312 313 314 315
			out = append(out, p.Pretty())
		}
		res.SetOutput(&stringList{out})
	},
	Type: stringList{},
	Marshalers: cmds.MarshalerMap{
		cmds.Text: stringListMarshaler,
	},
}