message.go 5.31 KB
Newer Older
1 2 3
package message

import (
4
	"fmt"
5 6
	"io"

7
	blocks "github.com/ipfs/go-ipfs/blocks"
8
	pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
9
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
10

11
	inet "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
12
	cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
13 14
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
15 16
)

17 18 19
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

20
type BitSwapMessage interface {
21 22
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23
	Wantlist() []Entry
24 25

	// Blocks returns a slice of unique blocks
26
	Blocks() []blocks.Block
27

Jeromy's avatar
Jeromy committed
28
	// AddEntry adds an entry to the Wantlist.
29
	AddEntry(key *cid.Cid, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30

31
	Cancel(key *cid.Cid)
Jeromy's avatar
Jeromy committed
32

33 34
	Empty() bool

35
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
36
	Full() bool
37

38
	AddBlock(blocks.Block)
39
	Exportable
40 41

	Loggable() map[string]interface{}
42 43 44
}

type Exportable interface {
45 46 47 48
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
49 50
}

51
type impl struct {
Jeromy's avatar
Jeromy committed
52
	full     bool
53 54
	wantlist map[string]Entry
	blocks   map[string]blocks.Block
55 56
}

57 58
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60
}

61
func newMsg(full bool) *impl {
62
	return &impl{
63 64
		blocks:   make(map[string]blocks.Block),
		wantlist: make(map[string]Entry),
65
		full:     full,
66
	}
67 68
}

Jeromy's avatar
Jeromy committed
69
type Entry struct {
70
	*wantlist.Entry
71
	Cancel bool
Jeromy's avatar
Jeromy committed
72 73
}

74
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
75
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
76
	for _, e := range pbm.GetWantlist().GetEntries() {
77 78 79 80 81
		c, err := cid.Cast([]byte(e.GetBlock()))
		if err != nil {
			return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
		}
		m.addEntry(c, int(e.GetPriority()), e.GetCancel())
82
	}
83 84

	// deprecated
85
	for _, d := range pbm.GetBlocks() {
86
		// CIDv0, sha256, protobuf only
87
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
88
		m.AddBlock(b)
89
	}
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

111
	return m, nil
112 113
}

Jeromy's avatar
Jeromy committed
114 115 116 117
func (m *impl) Full() bool {
	return m.full
}

118 119 120 121
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
122 123
func (m *impl) Wantlist() []Entry {
	var out []Entry
Jeromy's avatar
Jeromy committed
124 125 126 127
	for _, e := range m.wantlist {
		out = append(out, e)
	}
	return out
128 129
}

130 131
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
132 133 134 135
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
136 137
}

138 139
func (m *impl) Cancel(k *cid.Cid) {
	delete(m.wantlist, k.KeyString())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142
	m.addEntry(k, 0, true)
}

143
func (m *impl) AddEntry(k *cid.Cid, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144 145 146
	m.addEntry(k, priority, false)
}

147 148
func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
	k := c.KeyString()
Jeromy's avatar
Jeromy committed
149
	e, exists := m.wantlist[k]
150
	if exists {
Jeromy's avatar
Jeromy committed
151 152 153
		e.Priority = priority
		e.Cancel = cancel
	} else {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
154
		m.wantlist[k] = Entry{
155
			Entry: &wantlist.Entry{
156
				Cid:      c,
157 158 159
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
160
		}
161
	}
162 163
}

164
func (m *impl) AddBlock(b blocks.Block) {
165
	m.blocks[b.Cid().KeyString()] = b
166 167
}

168 169
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
170 171
	return FromPBReader(pbr)
}
172

Jeromy's avatar
Jeromy committed
173
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
174
	pb := new(pb.Message)
175
	if err := pbr.ReadMsg(pb); err != nil {
176 177
		return nil, err
	}
178

179
	return newMessageFromProto(*pb)
180 181
}

182
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
183 184 185 186
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
187
			Block:    proto.String(e.Cid.KeyString()),
Jeromy's avatar
Jeromy committed
188
			Priority: proto.Int32(int32(e.Priority)),
Jeromy's avatar
Jeromy committed
189
			Cancel:   proto.Bool(e.Cancel),
Jeromy's avatar
Jeromy committed
190
		})
191
	}
192
	pbm.Wantlist.Full = proto.Bool(m.full)
193
	for _, b := range m.Blocks() {
Jeromy's avatar
Jeromy committed
194
		pbm.Blocks = append(pbm.Blocks, b.RawData())
195
	}
Jeromy's avatar
Jeromy committed
196
	return pbm
197 198
}

199 200 201 202 203 204 205 206 207 208
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
			Block:    proto.String(e.Cid.KeyString()),
			Priority: proto.Int32(int32(e.Priority)),
			Cancel:   proto.Bool(e.Cancel),
		})
	}
209
	pbm.Wantlist.Full = proto.Bool(m.full)
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
	for _, b := range m.Blocks() {
		blk := &pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
		}
		pbm.Payload = append(pbm.Payload, blk)
	}
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	if err := pbw.WriteMsg(m.ToProtoV0()); err != nil {
		return err
	}
	return nil
}

func (m *impl) ToNetV1(w io.Writer) error {
230 231
	pbw := ggio.NewDelimitedWriter(w)

232
	if err := pbw.WriteMsg(m.ToProtoV1()); err != nil {
233 234 235
		return err
	}
	return nil
236
}
237 238

func (m *impl) Loggable() map[string]interface{} {
239 240
	var blocks []string
	for _, v := range m.blocks {
241
		blocks = append(blocks, v.Cid().String())
242
	}
243
	return map[string]interface{}{
244 245
		"blocks": blocks,
		"wants":  m.Wantlist(),
246 247
	}
}