message.go 5.23 KB
Newer Older
1 2 3
package message

import (
4
	"fmt"
5 6
	"io"

Jeromy's avatar
Jeromy committed
7 8 9
	pb "github.com/ipfs/go-bitswap/message/pb"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	blocks "github.com/ipfs/go-block-format"
10

Jeromy's avatar
Jeromy committed
11 12 13
	ggio "github.com/gogo/protobuf/io"
	cid "github.com/ipfs/go-cid"
	inet "github.com/libp2p/go-libp2p-net"
14 15
)

16 17 18
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

19
type BitSwapMessage interface {
20 21
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
	Wantlist() []Entry
23 24

	// Blocks returns a slice of unique blocks
25
	Blocks() []blocks.Block
26

Jeromy's avatar
Jeromy committed
27
	// AddEntry adds an entry to the Wantlist.
28
	AddEntry(key *cid.Cid, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29

30
	Cancel(key *cid.Cid)
Jeromy's avatar
Jeromy committed
31

32 33
	Empty() bool

34
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
35
	Full() bool
36

37
	AddBlock(blocks.Block)
38
	Exportable
39 40

	Loggable() map[string]interface{}
41 42 43
}

type Exportable interface {
44 45 46 47
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
48 49
}

50
type impl struct {
Jeromy's avatar
Jeromy committed
51
	full     bool
52
	wantlist map[string]*Entry
53
	blocks   map[string]blocks.Block
54 55
}

56 57
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59
}

60
func newMsg(full bool) *impl {
61
	return &impl{
62
		blocks:   make(map[string]blocks.Block),
63
		wantlist: make(map[string]*Entry),
64
		full:     full,
65
	}
66 67
}

Jeromy's avatar
Jeromy committed
68
type Entry struct {
69
	*wantlist.Entry
70
	Cancel bool
Jeromy's avatar
Jeromy committed
71 72
}

73
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
74
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
75
	for _, e := range pbm.GetWantlist().GetEntries() {
76 77 78 79 80
		c, err := cid.Cast([]byte(e.GetBlock()))
		if err != nil {
			return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
		}
		m.addEntry(c, int(e.GetPriority()), e.GetCancel())
81
	}
82 83

	// deprecated
84
	for _, d := range pbm.GetBlocks() {
85
		// CIDv0, sha256, protobuf only
86
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
87
		m.AddBlock(b)
88
	}
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

110
	return m, nil
111 112
}

Jeromy's avatar
Jeromy committed
113 114 115 116
func (m *impl) Full() bool {
	return m.full
}

117 118 119 120
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
121
func (m *impl) Wantlist() []Entry {
122
	out := make([]Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
123
	for _, e := range m.wantlist {
124
		out = append(out, *e)
Jeromy's avatar
Jeromy committed
125 126
	}
	return out
127 128
}

129 130
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
131 132 133 134
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
135 136
}

137 138
func (m *impl) Cancel(k *cid.Cid) {
	delete(m.wantlist, k.KeyString())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
139 140 141
	m.addEntry(k, 0, true)
}

142
func (m *impl) AddEntry(k *cid.Cid, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
143 144 145
	m.addEntry(k, priority, false)
}

146 147
func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
	k := c.KeyString()
Jeromy's avatar
Jeromy committed
148
	e, exists := m.wantlist[k]
149
	if exists {
Jeromy's avatar
Jeromy committed
150 151 152
		e.Priority = priority
		e.Cancel = cancel
	} else {
153
		m.wantlist[k] = &Entry{
154
			Entry: &wantlist.Entry{
155
				Cid:      c,
156 157 158
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
159
		}
160
	}
161 162
}

163
func (m *impl) AddBlock(b blocks.Block) {
164
	m.blocks[b.Cid().KeyString()] = b
165 166
}

167 168
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
169 170
	return FromPBReader(pbr)
}
171

Jeromy's avatar
Jeromy committed
172
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
173
	pb := new(pb.Message)
174
	if err := pbr.ReadMsg(pb); err != nil {
175 176
		return nil, err
	}
177

178
	return newMessageFromProto(*pb)
179 180
}

181
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
182 183
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
184
	pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
185 186
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Steven Allen's avatar
Steven Allen committed
187 188 189
			Block:    e.Cid.Bytes(),
			Priority: int32(e.Priority),
			Cancel:   e.Cancel,
Jeromy's avatar
Jeromy committed
190
		})
191
	}
Steven Allen's avatar
Steven Allen committed
192
	pbm.Wantlist.Full = m.full
193 194 195 196

	blocks := m.Blocks()
	pbm.Blocks = make([][]byte, 0, len(blocks))
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
197
		pbm.Blocks = append(pbm.Blocks, b.RawData())
198
	}
Jeromy's avatar
Jeromy committed
199
	return pbm
200 201
}

202 203 204
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
205
	pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
206 207
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Steven Allen's avatar
Steven Allen committed
208 209 210
			Block:    e.Cid.Bytes(),
			Priority: int32(e.Priority),
			Cancel:   e.Cancel,
211 212
		})
	}
Steven Allen's avatar
Steven Allen committed
213
	pbm.Wantlist.Full = m.full
214 215 216 217

	blocks := m.Blocks()
	pbm.Payload = make([]*pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
218 219 220 221 222 223 224 225 226 227 228 229
		blk := &pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
		}
		pbm.Payload = append(pbm.Payload, blk)
	}
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

230
	return pbw.WriteMsg(m.ToProtoV0())
231 232 233
}

func (m *impl) ToNetV1(w io.Writer) error {
234 235
	pbw := ggio.NewDelimitedWriter(w)

236
	return pbw.WriteMsg(m.ToProtoV1())
237
}
238 239

func (m *impl) Loggable() map[string]interface{} {
240
	blocks := make([]string, 0, len(m.blocks))
241
	for _, v := range m.blocks {
242
		blocks = append(blocks, v.Cid().String())
243
	}
244
	return map[string]interface{}{
245 246
		"blocks": blocks,
		"wants":  m.Wantlist(),
247 248
	}
}