message.go 5.23 KB
Newer Older
1 2 3
package message

import (
4
	"fmt"
5 6
	"io"

7
	blocks "github.com/ipfs/go-ipfs/blocks"
8
	pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
9
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
10

11
	cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
12 13
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
14
	inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
15 16
)

17 18 19
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

20
type BitSwapMessage interface {
21 22
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
23
	Wantlist() []Entry
24 25

	// Blocks returns a slice of unique blocks
26
	Blocks() []blocks.Block
27

Jeromy's avatar
Jeromy committed
28
	// AddEntry adds an entry to the Wantlist.
29
	AddEntry(key *cid.Cid, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30

31
	Cancel(key *cid.Cid)
Jeromy's avatar
Jeromy committed
32

33 34
	Empty() bool

35
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
36
	Full() bool
37

38
	AddBlock(blocks.Block)
39
	Exportable
40 41

	Loggable() map[string]interface{}
42 43 44
}

type Exportable interface {
45 46 47 48
	ToProtoV0() *pb.Message
	ToProtoV1() *pb.Message
	ToNetV0(w io.Writer) error
	ToNetV1(w io.Writer) error
49 50
}

51
type impl struct {
Jeromy's avatar
Jeromy committed
52
	full     bool
53 54
	wantlist map[string]Entry
	blocks   map[string]blocks.Block
55 56
}

57 58
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59 60
}

61
func newMsg(full bool) *impl {
62
	return &impl{
63 64
		blocks:   make(map[string]blocks.Block),
		wantlist: make(map[string]Entry),
65
		full:     full,
66
	}
67 68
}

Jeromy's avatar
Jeromy committed
69
type Entry struct {
70
	*wantlist.Entry
71
	Cancel bool
Jeromy's avatar
Jeromy committed
72 73
}

74
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
75
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
76
	for _, e := range pbm.GetWantlist().GetEntries() {
77 78 79 80 81
		c, err := cid.Cast([]byte(e.GetBlock()))
		if err != nil {
			return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
		}
		m.addEntry(c, int(e.GetPriority()), e.GetCancel())
82
	}
83 84

	// deprecated
85
	for _, d := range pbm.GetBlocks() {
86
		// CIDv0, sha256, protobuf only
87
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
88
		m.AddBlock(b)
89
	}
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
	//

	for _, b := range pbm.GetPayload() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
		if err != nil {
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
		}

		m.AddBlock(blk)
	}

111
	return m, nil
112 113
}

Jeromy's avatar
Jeromy committed
114 115 116 117
func (m *impl) Full() bool {
	return m.full
}

118 119 120 121
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
122 123
func (m *impl) Wantlist() []Entry {
	var out []Entry
Jeromy's avatar
Jeromy committed
124 125 126 127
	for _, e := range m.wantlist {
		out = append(out, e)
	}
	return out
128 129
}

130 131
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
132 133 134 135
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
136 137
}

138 139
func (m *impl) Cancel(k *cid.Cid) {
	delete(m.wantlist, k.KeyString())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142
	m.addEntry(k, 0, true)
}

143
func (m *impl) AddEntry(k *cid.Cid, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144 145 146
	m.addEntry(k, priority, false)
}

147 148
func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
	k := c.KeyString()
Jeromy's avatar
Jeromy committed
149
	e, exists := m.wantlist[k]
150
	if exists {
Jeromy's avatar
Jeromy committed
151 152 153
		e.Priority = priority
		e.Cancel = cancel
	} else {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
154
		m.wantlist[k] = Entry{
155
			Entry: &wantlist.Entry{
156
				Cid:      c,
157 158 159
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
160
		}
161
	}
162 163
}

164
func (m *impl) AddBlock(b blocks.Block) {
165
	m.blocks[b.Cid().KeyString()] = b
166 167
}

168 169
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
170 171
	return FromPBReader(pbr)
}
172

Jeromy's avatar
Jeromy committed
173
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
174
	pb := new(pb.Message)
175
	if err := pbr.ReadMsg(pb); err != nil {
176 177
		return nil, err
	}
178

179
	return newMessageFromProto(*pb)
180 181
}

182
func (m *impl) ToProtoV0() *pb.Message {
Jeromy's avatar
Jeromy committed
183 184 185 186
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
187
			Block:    proto.String(e.Cid.KeyString()),
Jeromy's avatar
Jeromy committed
188
			Priority: proto.Int32(int32(e.Priority)),
Jeromy's avatar
Jeromy committed
189
			Cancel:   proto.Bool(e.Cancel),
Jeromy's avatar
Jeromy committed
190
		})
191 192
	}
	for _, b := range m.Blocks() {
Jeromy's avatar
Jeromy committed
193
		pbm.Blocks = append(pbm.Blocks, b.RawData())
194
	}
Jeromy's avatar
Jeromy committed
195
	return pbm
196 197
}

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
func (m *impl) ToProtoV1() *pb.Message {
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
			Block:    proto.String(e.Cid.KeyString()),
			Priority: proto.Int32(int32(e.Priority)),
			Cancel:   proto.Bool(e.Cancel),
		})
	}
	for _, b := range m.Blocks() {
		blk := &pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
		}
		pbm.Payload = append(pbm.Payload, blk)
	}
	return pbm
}

func (m *impl) ToNetV0(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	if err := pbw.WriteMsg(m.ToProtoV0()); err != nil {
		return err
	}
	return nil
}

func (m *impl) ToNetV1(w io.Writer) error {
228 229
	pbw := ggio.NewDelimitedWriter(w)

230
	if err := pbw.WriteMsg(m.ToProtoV1()); err != nil {
231 232 233
		return err
	}
	return nil
234
}
235 236

func (m *impl) Loggable() map[string]interface{} {
237 238
	var blocks []string
	for _, v := range m.blocks {
239
		blocks = append(blocks, v.Cid().String())
240
	}
241
	return map[string]interface{}{
242 243
		"blocks": blocks,
		"wants":  m.Wantlist(),
244 245
	}
}