message.go 11.9 KB
Newer Older
1 2 3
package message

import (
Steven Allen's avatar
Steven Allen committed
4
	"encoding/binary"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
6 7
	"io"

8 9
	pb "gitlab.dms3.io/dms3/go-bitswap/message/pb"
	"gitlab.dms3.io/dms3/go-bitswap/wantlist"
10

11 12 13 14
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
	pool "gitlab.dms3.io/p2p/go-buffer-pool"
	msgio "gitlab.dms3.io/p2p/go-msgio"
Raúl Kripalani's avatar
Raúl Kripalani committed
15

16 17
	u "gitlab.dms3.io/dms3/go-dms3-util"
	"gitlab.dms3.io/p2p/go-p2p-core/network"
18 19
)

20 21
// BitSwapMessage is the basic interface for interacting building, encoding,
// and decoding messages sent on the BitSwap protocol.
22
type BitSwapMessage interface {
23 24
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25
	Wantlist() []Entry
26

27
	// Blocks returns a slice of unique blocks.
28
	Blocks() []blocks.Block
dirkmc's avatar
dirkmc committed
29 30 31 32 33 34 35 36 37 38
	// BlockPresences returns the list of HAVE / DONT_HAVE in the message
	BlockPresences() []BlockPresence
	// Haves returns the Cids for each HAVE
	Haves() []cid.Cid
	// DontHaves returns the Cids for each DONT_HAVE
	DontHaves() []cid.Cid
	// PendingBytes returns the number of outstanding bytes of data that the
	// engine has yet to send to the client (because they didn't fit in this
	// message)
	PendingBytes() int32
39

Jeromy's avatar
Jeromy committed
40
	// AddEntry adds an entry to the Wantlist.
41
	AddEntry(key cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType, sendDontHave bool) int
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42

dirkmc's avatar
dirkmc committed
43 44 45
	// Cancel adds a CANCEL for the given CID to the message
	// Returns the size of the CANCEL entry in the protobuf
	Cancel(key cid.Cid) int
Jeromy's avatar
Jeromy committed
46

47 48 49 50
	// Remove removes any entries for the given CID. Useful when the want
	// status for the CID changes when preparing a message.
	Remove(key cid.Cid)

dirkmc's avatar
dirkmc committed
51
	// Empty indicates whether the message has any information
52
	Empty() bool
dirkmc's avatar
dirkmc committed
53 54
	// Size returns the size of the message in bytes
	Size() int
55

56
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
57
	Full() bool
58

dirkmc's avatar
dirkmc committed
59
	// AddBlock adds a block to the message
60
	AddBlock(blocks.Block)
dirkmc's avatar
dirkmc committed
61 62 63 64 65 66 67 68 69
	// AddBlockPresence adds a HAVE / DONT_HAVE for the given Cid to the message
	AddBlockPresence(cid.Cid, pb.Message_BlockPresenceType)
	// AddHave adds a HAVE for the given Cid to the message
	AddHave(cid.Cid)
	// AddDontHave adds a DONT_HAVE for the given Cid to the message
	AddDontHave(cid.Cid)
	// SetPendingBytes sets the number of bytes of data that are yet to be sent
	// to the client (because they didn't fit in this message)
	SetPendingBytes(int32)
70
	Exportable
71 72

	Loggable() map[string]interface{}
73 74 75

	// Reset the values in the message back to defaults, so it can be reused
	Reset(bool)
Dirk McCormick's avatar
Dirk McCormick committed
76 77 78

	// Clone the message fields
	Clone() BitSwapMessage
79 80
}

81 82
// Exportable is an interface for structures than can be
// encoded in a bitswap protobuf.
83
type Exportable interface {
dirkmc's avatar
dirkmc committed
84 85 86
	// Note that older Bitswap versions use a different wire format, so we need
	// to convert the message to the appropriate format depending on which
	// version of the protocol the remote peer supports.
87 88 89 90
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
91 92
}

dirkmc's avatar
dirkmc committed
93 94 95 96 97 98
// BlockPresence represents a HAVE / DONT_HAVE for a given Cid
type BlockPresence struct {
	Cid  cid.Cid
	Type pb.Message_BlockPresenceType
}

99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
// Entry is a wantlist entry in a Bitswap message, with flags indicating
// - whether message is a cancel
// - whether requester wants a DONT_HAVE message
// - whether requester wants a HAVE message (instead of the block)
type Entry struct {
	wantlist.Entry
	Cancel       bool
	SendDontHave bool
}

// Get the size of the entry on the wire
func (e *Entry) Size() int {
	epb := e.ToPB()
	return epb.Size()
}

// Get the entry in protobuf form
func (e *Entry) ToPB() pb.Message_Wantlist_Entry {
	return pb.Message_Wantlist_Entry{
		Block:        pb.Cid{Cid: e.Cid},
		Priority:     int32(e.Priority),
		Cancel:       e.Cancel,
		WantType:     e.WantType,
		SendDontHave: e.SendDontHave,
	}
}

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
var MaxEntrySize = maxEntrySize()

func maxEntrySize() int {
	var maxInt32 int32 = (1 << 31) - 1

	c := cid.NewCidV0(u.Hash([]byte("cid")))
	e := Entry{
		Entry: wantlist.Entry{
			Cid:      c,
			Priority: maxInt32,
			WantType: pb.Message_Wantlist_Have,
		},
		SendDontHave: true, // true takes up more space than false
		Cancel:       true,
	}
	return e.Size()
}

144
type impl struct {
dirkmc's avatar
dirkmc committed
145 146 147 148 149
	full           bool
	wantlist       map[cid.Cid]*Entry
	blocks         map[cid.Cid]blocks.Block
	blockPresences map[cid.Cid]pb.Message_BlockPresenceType
	pendingBytes   int32
150 151
}

152
// New returns a new, empty bitswap message
153 154
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
155 156
}

157
func newMsg(full bool) *impl {
158
	return &impl{
Dirk McCormick's avatar
Dirk McCormick committed
159 160
		full:           full,
		wantlist:       make(map[cid.Cid]*Entry),
dirkmc's avatar
dirkmc committed
161 162
		blocks:         make(map[cid.Cid]blocks.Block),
		blockPresences: make(map[cid.Cid]pb.Message_BlockPresenceType),
163
	}
164 165
}

Dirk McCormick's avatar
Dirk McCormick committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
// Clone the message fields
func (m *impl) Clone() BitSwapMessage {
	msg := newMsg(m.full)
	for k := range m.wantlist {
		msg.wantlist[k] = m.wantlist[k]
	}
	for k := range m.blocks {
		msg.blocks[k] = m.blocks[k]
	}
	for k := range m.blockPresences {
		msg.blockPresences[k] = m.blockPresences[k]
	}
	msg.pendingBytes = m.pendingBytes
	return msg
}

182 183 184 185 186 187 188 189 190 191 192 193 194
// Reset the values in the message back to defaults, so it can be reused
func (m *impl) Reset(full bool) {
	m.full = full
	for k := range m.wantlist {
		delete(m.wantlist, k)
	}
	for k := range m.blocks {
		delete(m.blocks, k)
	}
	for k := range m.blockPresences {
		delete(m.blockPresences, k)
	}
	m.pendingBytes = 0
Jeromy's avatar
Jeromy committed
195 196
}

Steven Allen's avatar
Steven Allen committed
197 198
var errCidMissing = errors.New("missing cid")

199
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
Steven Allen's avatar
Steven Allen committed
200 201
	m := newMsg(pbm.Wantlist.Full)
	for _, e := range pbm.Wantlist.Entries {
Steven Allen's avatar
Steven Allen committed
202 203
		if !e.Block.Cid.Defined() {
			return nil, errCidMissing
204
		}
Steven Allen's avatar
Steven Allen committed
205
		m.addEntry(e.Block.Cid, e.Priority, e.Cancel, e.WantType, e.SendDontHave)
206
	}
207 208

	// deprecated
Steven Allen's avatar
Steven Allen committed
209
	for _, d := range pbm.Blocks {
210
		// CIDv0, sha256, protobuf only
211
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
212
		m.AddBlock(b)
213
	}
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

dirkmc's avatar
dirkmc committed
235
	for _, bi := range pbm.GetBlockPresences() {
Steven Allen's avatar
Steven Allen committed
236 237
		if !bi.Cid.Cid.Defined() {
			return nil, errCidMissing
dirkmc's avatar
dirkmc committed
238
		}
Steven Allen's avatar
Steven Allen committed
239
		m.AddBlockPresence(bi.Cid.Cid, bi.Type)
dirkmc's avatar
dirkmc committed
240 241 242 243
	}

	m.pendingBytes = pbm.PendingBytes

244
	return m, nil
245 246
}

Jeromy's avatar
Jeromy committed
247 248 249 250
func (m *impl) Full() bool {
	return m.full
}

251
func (m *impl) Empty() bool {
dirkmc's avatar
dirkmc committed
252
	return len(m.blocks) == 0 && len(m.wantlist) == 0 && len(m.blockPresences) == 0
253 254
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
255
func (m *impl) Wantlist() []Entry {
256
	out := make([]Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
257
	for _, e := range m.wantlist {
258
		out = append(out, *e)
Jeromy's avatar
Jeromy committed
259 260
	}
	return out
261 262
}

263 264
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
265 266 267 268
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
269 270
}

dirkmc's avatar
dirkmc committed
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
func (m *impl) BlockPresences() []BlockPresence {
	bps := make([]BlockPresence, 0, len(m.blockPresences))
	for c, t := range m.blockPresences {
		bps = append(bps, BlockPresence{c, t})
	}
	return bps
}

func (m *impl) Haves() []cid.Cid {
	return m.getBlockPresenceByType(pb.Message_Have)
}

func (m *impl) DontHaves() []cid.Cid {
	return m.getBlockPresenceByType(pb.Message_DontHave)
}

func (m *impl) getBlockPresenceByType(t pb.Message_BlockPresenceType) []cid.Cid {
	cids := make([]cid.Cid, 0, len(m.blockPresences))
	for c, bpt := range m.blockPresences {
		if bpt == t {
			cids = append(cids, c)
		}
	}
	return cids
}

func (m *impl) PendingBytes() int32 {
	return m.pendingBytes
Brian Tiger Chow's avatar
Brian Tiger Chow committed
299 300
}

dirkmc's avatar
dirkmc committed
301 302
func (m *impl) SetPendingBytes(pendingBytes int32) {
	m.pendingBytes = pendingBytes
Brian Tiger Chow's avatar
Brian Tiger Chow committed
303 304
}

305 306 307 308
func (m *impl) Remove(k cid.Cid) {
	delete(m.wantlist, k)
}

dirkmc's avatar
dirkmc committed
309 310 311 312
func (m *impl) Cancel(k cid.Cid) int {
	return m.addEntry(k, 0, true, pb.Message_Wantlist_Block, false)
}

313
func (m *impl) AddEntry(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType, sendDontHave bool) int {
dirkmc's avatar
dirkmc committed
314 315 316
	return m.addEntry(k, priority, false, wantType, sendDontHave)
}

317
func (m *impl) addEntry(c cid.Cid, priority int32, cancel bool, wantType pb.Message_Wantlist_WantType, sendDontHave bool) int {
Steven Allen's avatar
Steven Allen committed
318
	e, exists := m.wantlist[c]
319
	if exists {
dirkmc's avatar
dirkmc committed
320 321 322 323 324 325 326
		// Only change priority if want is of the same type
		if e.WantType == wantType {
			e.Priority = priority
		}
		// Only change from "dont cancel" to "do cancel"
		if cancel {
			e.Cancel = cancel
Jeromy's avatar
Jeromy committed
327
		}
dirkmc's avatar
dirkmc committed
328 329 330 331 332 333 334 335 336 337
		// Only change from "dont send" to "do send" DONT_HAVE
		if sendDontHave {
			e.SendDontHave = sendDontHave
		}
		// want-block overrides existing want-have
		if wantType == pb.Message_Wantlist_Block && e.WantType == pb.Message_Wantlist_Have {
			e.WantType = wantType
		}
		m.wantlist[c] = e
		return 0
338
	}
dirkmc's avatar
dirkmc committed
339 340 341 342 343 344 345 346 347 348 349 350

	e = &Entry{
		Entry: wantlist.Entry{
			Cid:      c,
			Priority: priority,
			WantType: wantType,
		},
		SendDontHave: sendDontHave,
		Cancel:       cancel,
	}
	m.wantlist[c] = e

351
	return e.Size()
352 353
}

354
func (m *impl) AddBlock(b blocks.Block) {
dirkmc's avatar
dirkmc committed
355
	delete(m.blockPresences, b.Cid())
Steven Allen's avatar
Steven Allen committed
356
	m.blocks[b.Cid()] = b
357 358
}

dirkmc's avatar
dirkmc committed
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
func (m *impl) AddBlockPresence(c cid.Cid, t pb.Message_BlockPresenceType) {
	if _, ok := m.blocks[c]; ok {
		return
	}
	m.blockPresences[c] = t
}

func (m *impl) AddHave(c cid.Cid) {
	m.AddBlockPresence(c, pb.Message_Have)
}

func (m *impl) AddDontHave(c cid.Cid) {
	m.AddBlockPresence(c, pb.Message_DontHave)
}

func (m *impl) Size() int {
	size := 0
	for _, block := range m.blocks {
		size += len(block.RawData())
	}
	for c := range m.blockPresences {
		size += BlockPresenceSize(c)
	}
	for _, e := range m.wantlist {
383
		size += e.Size()
dirkmc's avatar
dirkmc committed
384 385 386 387 388 389 390
	}

	return size
}

func BlockPresenceSize(c cid.Cid) int {
	return (&pb.Message_BlockPresence{
Steven Allen's avatar
Steven Allen committed
391
		Cid:  pb.Cid{Cid: c},
dirkmc's avatar
dirkmc committed
392 393 394 395
		Type: pb.Message_Have,
	}).Size()
}

396
// FromNet generates a new BitswapMessage from incoming data on an io.Reader.
397
func FromNet(r io.Reader) (BitSwapMessage, error) {
Steven Allen's avatar
Steven Allen committed
398 399
	reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
	return FromMsgReader(reader)
Jeromy's avatar
Jeromy committed
400
}
401

402
// FromPBReader generates a new Bitswap message from a gogo-protobuf reader
Steven Allen's avatar
Steven Allen committed
403 404 405
func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
	msg, err := r.ReadMsg()
	if err != nil {
406 407
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
408

Steven Allen's avatar
Steven Allen committed
409
	var pb pb.Message
Steven Allen's avatar
Steven Allen committed
410 411 412
	err = pb.Unmarshal(msg)
	r.ReleaseMsg(msg)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
413 414
		return nil, err
	}
Steven Allen's avatar
Steven Allen committed
415

Steven Allen's avatar
Steven Allen committed
416
	return newMessageFromProto(pb)
417 418
}

419
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
420
	pbm := new(pb.Message)
Steven Allen's avatar
Steven Allen committed
421
	pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
422
	for _, e := range m.wantlist {
423
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, e.ToPB())
424
	}
Steven Allen's avatar
Steven Allen committed
425
	pbm.Wantlist.Full = m.full
426 427 428 429

	blocks := m.Blocks()
	pbm.Blocks = make([][]byte, 0, len(blocks))
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
430
		pbm.Blocks = append(pbm.Blocks, b.RawData())
431
	}
Jeromy's avatar
Jeromy committed
432
	return pbm
433 434
}

435 436
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
Steven Allen's avatar
Steven Allen committed
437
	pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
438
	for _, e := range m.wantlist {
439
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, e.ToPB())
440
	}
Steven Allen's avatar
Steven Allen committed
441
	pbm.Wantlist.Full = m.full
442 443

	blocks := m.Blocks()
Steven Allen's avatar
Steven Allen committed
444
	pbm.Payload = make([]pb.Message_Block, 0, len(blocks))
445
	for _, b := range blocks {
Steven Allen's avatar
Steven Allen committed
446
		pbm.Payload = append(pbm.Payload, pb.Message_Block{
447 448
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
Steven Allen's avatar
Steven Allen committed
449
		})
450
	}
dirkmc's avatar
dirkmc committed
451 452 453 454

	pbm.BlockPresences = make([]pb.Message_BlockPresence, 0, len(m.blockPresences))
	for c, t := range m.blockPresences {
		pbm.BlockPresences = append(pbm.BlockPresences, pb.Message_BlockPresence{
Steven Allen's avatar
Steven Allen committed
455
			Cid:  pb.Cid{Cid: c},
dirkmc's avatar
dirkmc committed
456 457 458 459 460 461
			Type: t,
		})
	}

	pbm.PendingBytes = m.PendingBytes()

462 463 464 465
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
Steven Allen's avatar
Steven Allen committed
466
	return write(w, m.ToProtoV0())
467 468 469
}

func (m *impl) ToNetV1(w io.Writer) error {
Steven Allen's avatar
Steven Allen committed
470 471
	return write(w, m.ToProtoV1())
}
472

Steven Allen's avatar
Steven Allen committed
473 474
func write(w io.Writer, m *pb.Message) error {
	size := m.Size()
Steven Allen's avatar
Steven Allen committed
475

Steven Allen's avatar
Steven Allen committed
476 477
	buf := pool.Get(size + binary.MaxVarintLen64)
	defer pool.Put(buf)
Steven Allen's avatar
Steven Allen committed
478

Steven Allen's avatar
Steven Allen committed
479
	n := binary.PutUvarint(buf, uint64(size))
Steven Allen's avatar
Steven Allen committed
480 481 482

	written, err := m.MarshalTo(buf[n:])
	if err != nil {
Steven Allen's avatar
Steven Allen committed
483 484
		return err
	}
Steven Allen's avatar
Steven Allen committed
485 486 487
	n += written

	_, err = w.Write(buf[:n])
Steven Allen's avatar
Steven Allen committed
488
	return err
489
}
490 491

func (m *impl) Loggable() map[string]interface{} {
492
	blocks := make([]string, 0, len(m.blocks))
493
	for _, v := range m.blocks {
494
		blocks = append(blocks, v.Cid().String())
495
	}
496
	return map[string]interface{}{
497 498
		"blocks": blocks,
		"wants":  m.Wantlist(),
499 500
	}
}