message.go 5.37 KB
Newer Older
1 2 3
package message

import (
4
	"fmt"
5 6
	"io"

Jeromy's avatar
Jeromy committed
7 8 9
	pb "github.com/ipfs/go-bitswap/message/pb"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	blocks "github.com/ipfs/go-block-format"
10

Jeromy's avatar
Jeromy committed
11 12 13 14
	ggio "github.com/gogo/protobuf/io"
	proto "github.com/gogo/protobuf/proto"
	cid "github.com/ipfs/go-cid"
	inet "github.com/libp2p/go-libp2p-net"
15 16
)

17 18 19
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

20
type BitSwapMessage interface {
21 22
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23
	Wantlist() []Entry
24 25

	// Blocks returns a slice of unique blocks
26
	Blocks() []blocks.Block
27

Jeromy's avatar
Jeromy committed
28
	// AddEntry adds an entry to the Wantlist.
29
	AddEntry(key *cid.Cid, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30

31
	Cancel(key *cid.Cid)
Jeromy's avatar
Jeromy committed
32

33 34
	Empty() bool

35
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
36
	Full() bool
37

38
	AddBlock(blocks.Block)
39
	Exportable
40 41

	Loggable() map[string]interface{}
42 43 44
}

type Exportable interface {
45 46 47 48
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
49 50
}

51
type impl struct {
Jeromy's avatar
Jeromy committed
52
	full     bool
53
	wantlist map[string]*Entry
54
	blocks   map[string]blocks.Block
55 56
}

57 58
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60
}

61
func newMsg(full bool) *impl {
62
	return &impl{
63
		blocks:   make(map[string]blocks.Block),
64
		wantlist: make(map[string]*Entry),
65
		full:     full,
66
	}
67 68
}

Jeromy's avatar
Jeromy committed
69
type Entry struct {
70
	*wantlist.Entry
71
	Cancel bool
Jeromy's avatar
Jeromy committed
72 73
}

74
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
75
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
76
	for _, e := range pbm.GetWantlist().GetEntries() {
77 78 79 80 81
		c, err := cid.Cast([]byte(e.GetBlock()))
		if err != nil {
			return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
		}
		m.addEntry(c, int(e.GetPriority()), e.GetCancel())
82
	}
83 84

	// deprecated
85
	for _, d := range pbm.GetBlocks() {
86
		// CIDv0, sha256, protobuf only
87
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
88
		m.AddBlock(b)
89
	}
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

111
	return m, nil
112 113
}

Jeromy's avatar
Jeromy committed
114 115 116 117
func (m *impl) Full() bool {
	return m.full
}

118 119 120 121
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
122
func (m *impl) Wantlist() []Entry {
123
	out := make([]Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
124
	for _, e := range m.wantlist {
125
		out = append(out, *e)
Jeromy's avatar
Jeromy committed
126 127
	}
	return out
128 129
}

130 131
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
132 133 134 135
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
136 137
}

138 139
func (m *impl) Cancel(k *cid.Cid) {
	delete(m.wantlist, k.KeyString())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142
	m.addEntry(k, 0, true)
}

143
func (m *impl) AddEntry(k *cid.Cid, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144 145 146
	m.addEntry(k, priority, false)
}

147 148
func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
	k := c.KeyString()
Jeromy's avatar
Jeromy committed
149
	e, exists := m.wantlist[k]
150
	if exists {
Jeromy's avatar
Jeromy committed
151 152 153
		e.Priority = priority
		e.Cancel = cancel
	} else {
154
		m.wantlist[k] = &Entry{
155
			Entry: &wantlist.Entry{
156
				Cid:      c,
157 158 159
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
160
		}
161
	}
162 163
}

164
func (m *impl) AddBlock(b blocks.Block) {
165
	m.blocks[b.Cid().KeyString()] = b
166 167
}

168 169
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
170 171
	return FromPBReader(pbr)
}
172

Jeromy's avatar
Jeromy committed
173
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
174
	pb := new(pb.Message)
175
	if err := pbr.ReadMsg(pb); err != nil {
176 177
		return nil, err
	}
178

179
	return newMessageFromProto(*pb)
180 181
}

182
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
183 184
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
185
	pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
186 187
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
188
			Block:    proto.String(e.Cid.KeyString()),
Jeromy's avatar
Jeromy committed
189
			Priority: proto.Int32(int32(e.Priority)),
Jeromy's avatar
Jeromy committed
190
			Cancel:   proto.Bool(e.Cancel),
Jeromy's avatar
Jeromy committed
191
		})
192
	}
193
	pbm.Wantlist.Full = proto.Bool(m.full)
194 195 196 197

	blocks := m.Blocks()
	pbm.Blocks = make([][]byte, 0, len(blocks))
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
198
		pbm.Blocks = append(pbm.Blocks, b.RawData())
199
	}
Jeromy's avatar
Jeromy committed
200
	return pbm
201 202
}

203 204 205
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
206
	pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
207 208 209 210 211 212 213
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
			Block:    proto.String(e.Cid.KeyString()),
			Priority: proto.Int32(int32(e.Priority)),
			Cancel:   proto.Bool(e.Cancel),
		})
	}
214
	pbm.Wantlist.Full = proto.Bool(m.full)
215 216 217 218

	blocks := m.Blocks()
	pbm.Payload = make([]*pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
219 220 221 222 223 224 225 226 227 228 229 230
		blk := &pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
		}
		pbm.Payload = append(pbm.Payload, blk)
	}
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

231
	return pbw.WriteMsg(m.ToProtoV0())
232 233 234
}

func (m *impl) ToNetV1(w io.Writer) error {
235 236
	pbw := ggio.NewDelimitedWriter(w)

237
	return pbw.WriteMsg(m.ToProtoV1())
238
}
239 240

func (m *impl) Loggable() map[string]interface{} {
241
	blocks := make([]string, 0, len(m.blocks))
242
	for _, v := range m.blocks {
243
		blocks = append(blocks, v.Cid().String())
244
	}
245
	return map[string]interface{}{
246 247
		"blocks": blocks,
		"wants":  m.Wantlist(),
248 249
	}
}