message.go 3.83 KB
Newer Older
1 2 3
package message

import (
4 5
	"io"

6
	blocks "github.com/ipfs/go-ipfs/blocks"
7
	key "github.com/ipfs/go-ipfs/blocks/key"
8
	pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
9
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
10
	inet "gx/ipfs/QmcQTVCQWCN2MYgBHpFXE5S56rcg2mRsxaRgMYmA1UWgA8/go-libp2p/p2p/net"
11

12 13
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
14 15
)

16 17 18
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

19
type BitSwapMessage interface {
20 21
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
	Wantlist() []Entry
23 24

	// Blocks returns a slice of unique blocks
25
	Blocks() []blocks.Block
26

Jeromy's avatar
Jeromy committed
27
	// AddEntry adds an entry to the Wantlist.
28
	AddEntry(key key.Key, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29

30
	Cancel(key key.Key)
Jeromy's avatar
Jeromy committed
31

32 33
	Empty() bool

34
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
35
	Full() bool
36

37
	AddBlock(blocks.Block)
38
	Exportable
39 40

	Loggable() map[string]interface{}
41 42 43
}

type Exportable interface {
44
	ToProto() *pb.Message
45
	ToNet(w io.Writer) error
46 47
}

48
type impl struct {
Jeromy's avatar
Jeromy committed
49
	full     bool
50
	wantlist map[key.Key]Entry
51
	blocks   map[key.Key]blocks.Block
52 53
}

54 55
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
56 57
}

58
func newMsg(full bool) *impl {
59
	return &impl{
60
		blocks:   make(map[key.Key]blocks.Block),
61
		wantlist: make(map[key.Key]Entry),
62
		full:     full,
63
	}
64 65
}

Jeromy's avatar
Jeromy committed
66
type Entry struct {
67 68
	wantlist.Entry
	Cancel bool
Jeromy's avatar
Jeromy committed
69 70
}

71
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
72
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
73
	for _, e := range pbm.GetWantlist().GetEntries() {
74
		m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
75 76
	}
	for _, d := range pbm.GetBlocks() {
77
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
78
		m.AddBlock(b)
79
	}
80
	return m
81 82
}

Jeromy's avatar
Jeromy committed
83 84 85 86
func (m *impl) Full() bool {
	return m.full
}

87 88 89 90
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92
func (m *impl) Wantlist() []Entry {
	var out []Entry
Jeromy's avatar
Jeromy committed
93 94 95 96
	for _, e := range m.wantlist {
		out = append(out, e)
	}
	return out
97 98
}

99 100
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
101 102 103 104
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
105 106
}

107
func (m *impl) Cancel(k key.Key) {
108
	delete(m.wantlist, k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110 111
	m.addEntry(k, 0, true)
}

112
func (m *impl) AddEntry(k key.Key, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
113 114 115
	m.addEntry(k, priority, false)
}

116
func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
Jeromy's avatar
Jeromy committed
117
	e, exists := m.wantlist[k]
118
	if exists {
Jeromy's avatar
Jeromy committed
119 120 121
		e.Priority = priority
		e.Cancel = cancel
	} else {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
122
		m.wantlist[k] = Entry{
123 124 125 126 127
			Entry: wantlist.Entry{
				Key:      k,
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
128
		}
129
	}
130 131
}

132
func (m *impl) AddBlock(b blocks.Block) {
133
	m.blocks[b.Key()] = b
134 135
}

136 137 138
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)

139
	pb := new(pb.Message)
140
	if err := pbr.ReadMsg(pb); err != nil {
141 142
		return nil, err
	}
143

144
	m := newMessageFromProto(*pb)
145
	return m, nil
146 147
}

148
func (m *impl) ToProto() *pb.Message {
Jeromy's avatar
Jeromy committed
149 150 151 152 153 154
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
			Block:    proto.String(string(e.Key)),
			Priority: proto.Int32(int32(e.Priority)),
Jeromy's avatar
Jeromy committed
155
			Cancel:   proto.Bool(e.Cancel),
Jeromy's avatar
Jeromy committed
156
		})
157 158
	}
	for _, b := range m.Blocks() {
159
		pbm.Blocks = append(pbm.Blocks, b.Data())
160
	}
Jeromy's avatar
Jeromy committed
161
	return pbm
162 163
}

164 165 166 167 168 169 170
func (m *impl) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	if err := pbw.WriteMsg(m.ToProto()); err != nil {
		return err
	}
	return nil
171
}
172 173

func (m *impl) Loggable() map[string]interface{} {
174 175
	var blocks []string
	for _, v := range m.blocks {
Michael Muré's avatar
Michael Muré committed
176
		blocks = append(blocks, v.Key().B58String())
177
	}
178
	return map[string]interface{}{
179 180
		"blocks": blocks,
		"wants":  m.Wantlist(),
181 182
	}
}