message.go 5.05 KB
Newer Older
1 2 3
package message

import (
4
	"fmt"
5 6
	"io"

Jeromy's avatar
Jeromy committed
7 8 9
	pb "github.com/ipfs/go-bitswap/message/pb"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	blocks "github.com/ipfs/go-block-format"
10

Jeromy's avatar
Jeromy committed
11 12 13
	ggio "github.com/gogo/protobuf/io"
	cid "github.com/ipfs/go-cid"
	inet "github.com/libp2p/go-libp2p-net"
14 15
)

16 17 18
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

19
type BitSwapMessage interface {
20 21
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
	Wantlist() []Entry
23

24
	// Blocks returns a slice of unique blocks.
25
	Blocks() []blocks.Block
26

Jeromy's avatar
Jeromy committed
27
	// AddEntry adds an entry to the Wantlist.
28
	AddEntry(key cid.Cid, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29

30
	Cancel(key cid.Cid)
Jeromy's avatar
Jeromy committed
31

32 33
	Empty() bool

34
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
35
	Full() bool
36

37
	AddBlock(blocks.Block)
38
	Exportable
39 40

	Loggable() map[string]interface{}
41 42 43
}

type Exportable interface {
44 45 46 47
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
48 49
}

50
type impl struct {
Jeromy's avatar
Jeromy committed
51
	full     bool
Steven Allen's avatar
Steven Allen committed
52 53
	wantlist map[cid.Cid]*Entry
	blocks   map[cid.Cid]blocks.Block
54 55
}

56 57
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59
}

60
func newMsg(full bool) *impl {
61
	return &impl{
Steven Allen's avatar
Steven Allen committed
62 63
		blocks:   make(map[cid.Cid]blocks.Block),
		wantlist: make(map[cid.Cid]*Entry),
64
		full:     full,
65
	}
66 67
}

Jeromy's avatar
Jeromy committed
68
type Entry struct {
69
	*wantlist.Entry
70
	Cancel bool
Jeromy's avatar
Jeromy committed
71 72
}

73
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
Steven Allen's avatar
Steven Allen committed
74 75 76
	m := newMsg(pbm.Wantlist.Full)
	for _, e := range pbm.Wantlist.Entries {
		c, err := cid.Cast([]byte(e.Block))
77 78 79
		if err != nil {
			return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
		}
Steven Allen's avatar
Steven Allen committed
80
		m.addEntry(c, int(e.Priority), e.Cancel)
81
	}
82 83

	// deprecated
Steven Allen's avatar
Steven Allen committed
84
	for _, d := range pbm.Blocks {
85
		// CIDv0, sha256, protobuf only
86
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
87
		m.AddBlock(b)
88
	}
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

110
	return m, nil
111 112
}

Jeromy's avatar
Jeromy committed
113 114 115 116
func (m *impl) Full() bool {
	return m.full
}

117 118 119 120
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
121
func (m *impl) Wantlist() []Entry {
122
	out := make([]Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
123
	for _, e := range m.wantlist {
124
		out = append(out, *e)
Jeromy's avatar
Jeromy committed
125 126
	}
	return out
127 128
}

129 130
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
131 132 133 134
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
135 136
}

137
func (m *impl) Cancel(k cid.Cid) {
Steven Allen's avatar
Steven Allen committed
138
	delete(m.wantlist, k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
139 140 141
	m.addEntry(k, 0, true)
}

142
func (m *impl) AddEntry(k cid.Cid, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
143 144 145
	m.addEntry(k, priority, false)
}

146
func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
Steven Allen's avatar
Steven Allen committed
147
	e, exists := m.wantlist[c]
148
	if exists {
Jeromy's avatar
Jeromy committed
149 150 151
		e.Priority = priority
		e.Cancel = cancel
	} else {
Steven Allen's avatar
Steven Allen committed
152
		m.wantlist[c] = &Entry{
153
			Entry: &wantlist.Entry{
154
				Cid:      c,
155 156 157
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
158
		}
159
	}
160 161
}

162
func (m *impl) AddBlock(b blocks.Block) {
Steven Allen's avatar
Steven Allen committed
163
	m.blocks[b.Cid()] = b
164 165
}

166 167
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
168 169
	return FromPBReader(pbr)
}
170

Jeromy's avatar
Jeromy committed
171
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
172
	pb := new(pb.Message)
173
	if err := pbr.ReadMsg(pb); err != nil {
174 175
		return nil, err
	}
176

177
	return newMessageFromProto(*pb)
178 179
}

180
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
181
	pbm := new(pb.Message)
Steven Allen's avatar
Steven Allen committed
182
	pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
183
	for _, e := range m.wantlist {
Steven Allen's avatar
Steven Allen committed
184
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{
Steven Allen's avatar
Steven Allen committed
185 186 187
			Block:    e.Cid.Bytes(),
			Priority: int32(e.Priority),
			Cancel:   e.Cancel,
Jeromy's avatar
Jeromy committed
188
		})
189
	}
Steven Allen's avatar
Steven Allen committed
190
	pbm.Wantlist.Full = m.full
191 192 193 194

	blocks := m.Blocks()
	pbm.Blocks = make([][]byte, 0, len(blocks))
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
195
		pbm.Blocks = append(pbm.Blocks, b.RawData())
196
	}
Jeromy's avatar
Jeromy committed
197
	return pbm
198 199
}

200 201
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
Steven Allen's avatar
Steven Allen committed
202
	pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
203
	for _, e := range m.wantlist {
Steven Allen's avatar
Steven Allen committed
204
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{
Steven Allen's avatar
Steven Allen committed
205 206 207
			Block:    e.Cid.Bytes(),
			Priority: int32(e.Priority),
			Cancel:   e.Cancel,
208 209
		})
	}
Steven Allen's avatar
Steven Allen committed
210
	pbm.Wantlist.Full = m.full
211 212

	blocks := m.Blocks()
Steven Allen's avatar
Steven Allen committed
213
	pbm.Payload = make([]pb.Message_Block, 0, len(blocks))
214
	for _, b := range blocks {
Steven Allen's avatar
Steven Allen committed
215
		pbm.Payload = append(pbm.Payload, pb.Message_Block{
216 217
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
Steven Allen's avatar
Steven Allen committed
218
		})
219 220 221 222 223 224 225
	}
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

226
	return pbw.WriteMsg(m.ToProtoV0())
227 228 229
}

func (m *impl) ToNetV1(w io.Writer) error {
230 231
	pbw := ggio.NewDelimitedWriter(w)

232
	return pbw.WriteMsg(m.ToProtoV1())
233
}
234 235

func (m *impl) Loggable() map[string]interface{} {
236
	blocks := make([]string, 0, len(m.blocks))
237
	for _, v := range m.blocks {
238
		blocks = append(blocks, v.Cid().String())
239
	}
240
	return map[string]interface{}{
241 242
		"blocks": blocks,
		"wants":  m.Wantlist(),
243 244
	}
}