message.go 5.22 KB
Newer Older
1 2 3
package message

import (
4
	"fmt"
5 6
	"io"

Jeromy's avatar
Jeromy committed
7 8 9
	pb "github.com/ipfs/go-bitswap/message/pb"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	blocks "github.com/ipfs/go-block-format"
10

Jeromy's avatar
Jeromy committed
11 12 13
	ggio "github.com/gogo/protobuf/io"
	cid "github.com/ipfs/go-cid"
	inet "github.com/libp2p/go-libp2p-net"
14 15
)

16 17 18
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

19
type BitSwapMessage interface {
20 21
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
	Wantlist() []Entry
23 24

	// Blocks returns a slice of unique blocks
25
	Blocks() []blocks.Block
26

Jeromy's avatar
Jeromy committed
27
	// AddEntry adds an entry to the Wantlist.
28
	AddEntry(key cid.Cid, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29

30
	Cancel(key cid.Cid)
Jeromy's avatar
Jeromy committed
31

32 33
	Empty() bool

34
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
35
	Full() bool
36

37
	AddBlock(blocks.Block)
38
	Exportable
39 40

	Loggable() map[string]interface{}
41 42 43
}

type Exportable interface {
44 45 46 47
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
48 49
}

50
type impl struct {
Jeromy's avatar
Jeromy committed
51
	full     bool
52
	wantlist map[string]*Entry
53
	blocks   map[string]blocks.Block
54 55
}

56 57
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58 59
}

60
func newMsg(full bool) *impl {
61
	return &impl{
62
		blocks:   make(map[string]blocks.Block),
63
		wantlist: make(map[string]*Entry),
64
		full:     full,
65
	}
66 67
}

Jeromy's avatar
Jeromy committed
68
type Entry struct {
69
	*wantlist.Entry
70
	Cancel bool
Jeromy's avatar
Jeromy committed
71 72
}

73
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
74
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
75
	for _, e := range pbm.GetWantlist().GetEntries() {
76 77 78 79 80
		c, err := cid.Cast([]byte(e.GetBlock()))
		if err != nil {
			return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
		}
		m.addEntry(c, int(e.GetPriority()), e.GetCancel())
81
	}
82 83

	// deprecated
84
	for _, d := range pbm.GetBlocks() {
85
		// CIDv0, sha256, protobuf only
86
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
87
		m.AddBlock(b)
88
	}
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

110
	return m, nil
111 112
}

Jeromy's avatar
Jeromy committed
113 114 115 116
func (m *impl) Full() bool {
	return m.full
}

117 118 119 120
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
121
func (m *impl) Wantlist() []Entry {
122
	out := make([]Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
123
	for _, e := range m.wantlist {
124
		out = append(out, *e)
Jeromy's avatar
Jeromy committed
125 126
	}
	return out
127 128
}

129 130
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
131 132 133 134
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
135 136
}

137
func (m *impl) Cancel(k cid.Cid) {
138
	delete(m.wantlist, k.KeyString())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
139 140 141
	m.addEntry(k, 0, true)
}

142
func (m *impl) AddEntry(k cid.Cid, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
143 144 145
	m.addEntry(k, priority, false)
}

146
func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
147
	k := c.KeyString()
Jeromy's avatar
Jeromy committed
148
	e, exists := m.wantlist[k]
149
	if exists {
Jeromy's avatar
Jeromy committed
150 151 152
		e.Priority = priority
		e.Cancel = cancel
	} else {
153
		m.wantlist[k] = &Entry{
154
			Entry: &wantlist.Entry{
155
				Cid:      c,
156 157 158
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
159
		}
160
	}
161 162
}

163
func (m *impl) AddBlock(b blocks.Block) {
164
	m.blocks[b.Cid().KeyString()] = b
165 166
}

167 168
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
169 170
	return FromPBReader(pbr)
}
171

Jeromy's avatar
Jeromy committed
172
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
173
	pb := new(pb.Message)
174
	if err := pbr.ReadMsg(pb); err != nil {
175 176
		return nil, err
	}
177

178
	return newMessageFromProto(*pb)
179 180
}

181
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
182 183
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
184
	pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
Jeromy's avatar
Jeromy committed
185 186
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Steven Allen's avatar
Steven Allen committed
187 188 189
			Block:    e.Cid.Bytes(),
			Priority: int32(e.Priority),
			Cancel:   e.Cancel,
Jeromy's avatar
Jeromy committed
190
		})
191
	}
Steven Allen's avatar
Steven Allen committed
192
	pbm.Wantlist.Full = m.full
193 194 195 196

	blocks := m.Blocks()
	pbm.Blocks = make([][]byte, 0, len(blocks))
	for _, b := range blocks {
Jeromy's avatar
Jeromy committed
197
		pbm.Blocks = append(pbm.Blocks, b.RawData())
198
	}
Jeromy's avatar
Jeromy committed
199
	return pbm
200 201
}

202 203 204
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
205
	pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
206 207
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Steven Allen's avatar
Steven Allen committed
208 209 210
			Block:    e.Cid.Bytes(),
			Priority: int32(e.Priority),
			Cancel:   e.Cancel,
211 212
		})
	}
Steven Allen's avatar
Steven Allen committed
213
	pbm.Wantlist.Full = m.full
214 215 216 217

	blocks := m.Blocks()
	pbm.Payload = make([]*pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
218 219 220 221 222 223 224 225 226 227 228 229
		blk := &pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
		}
		pbm.Payload = append(pbm.Payload, blk)
	}
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

230
	return pbw.WriteMsg(m.ToProtoV0())
231 232 233
}

func (m *impl) ToNetV1(w io.Writer) error {
234 235
	pbw := ggio.NewDelimitedWriter(w)

236
	return pbw.WriteMsg(m.ToProtoV1())
237
}
238 239

func (m *impl) Loggable() map[string]interface{} {
240
	blocks := make([]string, 0, len(m.blocks))
241
	for _, v := range m.blocks {
242
		blocks = append(blocks, v.Cid().String())
243
	}
244
	return map[string]interface{}{
245 246
		"blocks": blocks,
		"wants":  m.Wantlist(),
247 248
	}
}