message.go 3.94 KB
Newer Older
1 2 3
package message

import (
4 5
	"io"

6
	blocks "github.com/ipfs/go-ipfs/blocks"
7
	pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb"
8
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
9 10
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
	inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
11

12 13
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
14 15
)

16 17 18
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

19
type BitSwapMessage interface {
20 21
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
	Wantlist() []Entry
23 24

	// Blocks returns a slice of unique blocks
25
	Blocks() []blocks.Block
26

Jeromy's avatar
Jeromy committed
27
	// AddEntry adds an entry to the Wantlist.
28
	AddEntry(key key.Key, priority int)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29

30
	Cancel(key key.Key)
Jeromy's avatar
Jeromy committed
31

32 33
	Empty() bool

34
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
35
	Full() bool
36

37
	AddBlock(blocks.Block)
38
	Exportable
39 40

	Loggable() map[string]interface{}
41 42 43
}

type Exportable interface {
44
	ToProto() *pb.Message
45
	ToNet(w io.Writer) error
46 47
}

48
type impl struct {
Jeromy's avatar
Jeromy committed
49
	full     bool
50
	wantlist map[key.Key]Entry
51
	blocks   map[key.Key]blocks.Block
52 53
}

54 55
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
56 57
}

58
func newMsg(full bool) *impl {
59
	return &impl{
60
		blocks:   make(map[key.Key]blocks.Block),
61
		wantlist: make(map[key.Key]Entry),
62
		full:     full,
63
	}
64 65
}

Jeromy's avatar
Jeromy committed
66
type Entry struct {
67
	*wantlist.Entry
68
	Cancel bool
Jeromy's avatar
Jeromy committed
69 70
}

71
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
72
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
73
	for _, e := range pbm.GetWantlist().GetEntries() {
74
		m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
75 76
	}
	for _, d := range pbm.GetBlocks() {
77
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
78
		m.AddBlock(b)
79
	}
80
	return m
81 82
}

Jeromy's avatar
Jeromy committed
83 84 85 86
func (m *impl) Full() bool {
	return m.full
}

87 88 89 90
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92
func (m *impl) Wantlist() []Entry {
	var out []Entry
Jeromy's avatar
Jeromy committed
93 94 95 96
	for _, e := range m.wantlist {
		out = append(out, e)
	}
	return out
97 98
}

99 100
func (m *impl) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(m.blocks))
101 102 103 104
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
105 106
}

107
func (m *impl) Cancel(k key.Key) {
108
	delete(m.wantlist, k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110 111
	m.addEntry(k, 0, true)
}

112
func (m *impl) AddEntry(k key.Key, priority int) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
113 114 115
	m.addEntry(k, priority, false)
}

116
func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
Jeromy's avatar
Jeromy committed
117
	e, exists := m.wantlist[k]
118
	if exists {
Jeromy's avatar
Jeromy committed
119 120 121
		e.Priority = priority
		e.Cancel = cancel
	} else {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
122
		m.wantlist[k] = Entry{
123
			Entry: &wantlist.Entry{
124 125 126 127
				Key:      k,
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
128
		}
129
	}
130 131
}

132
func (m *impl) AddBlock(b blocks.Block) {
133
	m.blocks[b.Key()] = b
134 135
}

136 137
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
Jeromy's avatar
Jeromy committed
138 139
	return FromPBReader(pbr)
}
140

Jeromy's avatar
Jeromy committed
141
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
142
	pb := new(pb.Message)
143
	if err := pbr.ReadMsg(pb); err != nil {
144 145
		return nil, err
	}
146

147
	m := newMessageFromProto(*pb)
148
	return m, nil
149 150
}

151
func (m *impl) ToProto() *pb.Message {
Jeromy's avatar
Jeromy committed
152 153 154 155 156 157
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
			Block:    proto.String(string(e.Key)),
			Priority: proto.Int32(int32(e.Priority)),
Jeromy's avatar
Jeromy committed
158
			Cancel:   proto.Bool(e.Cancel),
Jeromy's avatar
Jeromy committed
159
		})
160 161
	}
	for _, b := range m.Blocks() {
Jeromy's avatar
Jeromy committed
162
		pbm.Blocks = append(pbm.Blocks, b.RawData())
163
	}
Jeromy's avatar
Jeromy committed
164
	return pbm
165 166
}

167 168 169 170 171 172 173
func (m *impl) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	if err := pbw.WriteMsg(m.ToProto()); err != nil {
		return err
	}
	return nil
174
}
175 176

func (m *impl) Loggable() map[string]interface{} {
177 178
	var blocks []string
	for _, v := range m.blocks {
Michael Muré's avatar
Michael Muré committed
179
		blocks = append(blocks, v.Key().B58String())
180
	}
181
	return map[string]interface{}{
182 183
		"blocks": blocks,
		"wants":  m.Wantlist(),
184 185
	}
}