message.go 3.77 KB
Newer Older
1 2 3
package message

import (
4 5
	"io"

6 7 8 9 10
	blocks "github.com/ipfs/go-ipfs/blocks"
	pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/internal/pb"
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
	inet "github.com/ipfs/go-ipfs/p2p/net"
	u "github.com/ipfs/go-ipfs/util"
11

12 13
	ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
	proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
14 15
)

16 17 18
// TODO move message.go into the bitswap package
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

19
type BitSwapMessage interface {
20 21
	// Wantlist returns a slice of unique keys that represent data wanted by
	// the sender.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
	Wantlist() []Entry
23 24

	// Blocks returns a slice of unique blocks
Jeromy's avatar
Jeromy committed
25
	Blocks() []*blocks.Block
26

Jeromy's avatar
Jeromy committed
27
	// AddEntry adds an entry to the Wantlist.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
28 29 30
	AddEntry(key u.Key, priority int)

	Cancel(key u.Key)
Jeromy's avatar
Jeromy committed
31

32 33
	Empty() bool

34
	// A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
Jeromy's avatar
Jeromy committed
35
	Full() bool
36

Jeromy's avatar
Jeromy committed
37
	AddBlock(*blocks.Block)
38
	Exportable
39 40

	Loggable() map[string]interface{}
41 42 43
}

type Exportable interface {
44
	ToProto() *pb.Message
45
	ToNet(w io.Writer) error
46 47
}

48
type impl struct {
Jeromy's avatar
Jeromy committed
49
	full     bool
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50
	wantlist map[u.Key]Entry
51
	blocks   map[u.Key]*blocks.Block
52 53
}

54 55
func New(full bool) BitSwapMessage {
	return newMsg(full)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
56 57
}

58
func newMsg(full bool) *impl {
59
	return &impl{
Jeromy's avatar
Jeromy committed
60
		blocks:   make(map[u.Key]*blocks.Block),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
61
		wantlist: make(map[u.Key]Entry),
62
		full:     full,
63
	}
64 65
}

Jeromy's avatar
Jeromy committed
66
type Entry struct {
67 68
	wantlist.Entry
	Cancel bool
Jeromy's avatar
Jeromy committed
69 70
}

71
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
72
	m := newMsg(pbm.GetWantlist().GetFull())
Jeromy's avatar
Jeromy committed
73
	for _, e := range pbm.GetWantlist().GetEntries() {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
74
		m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
75 76
	}
	for _, d := range pbm.GetBlocks() {
77
		b := blocks.NewBlock(d)
Jeromy's avatar
Jeromy committed
78
		m.AddBlock(b)
79
	}
80
	return m
81 82
}

Jeromy's avatar
Jeromy committed
83 84 85 86
func (m *impl) Full() bool {
	return m.full
}

87 88 89 90
func (m *impl) Empty() bool {
	return len(m.blocks) == 0 && len(m.wantlist) == 0
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
91 92
func (m *impl) Wantlist() []Entry {
	var out []Entry
Jeromy's avatar
Jeromy committed
93 94 95 96
	for _, e := range m.wantlist {
		out = append(out, e)
	}
	return out
97 98
}

Jeromy's avatar
Jeromy committed
99
func (m *impl) Blocks() []*blocks.Block {
100
	bs := make([]*blocks.Block, 0, len(m.blocks))
101 102 103 104
	for _, block := range m.blocks {
		bs = append(bs, block)
	}
	return bs
105 106
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
107
func (m *impl) Cancel(k u.Key) {
108
	delete(m.wantlist, k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110 111 112 113 114 115 116
	m.addEntry(k, 0, true)
}

func (m *impl) AddEntry(k u.Key, priority int) {
	m.addEntry(k, priority, false)
}

func (m *impl) addEntry(k u.Key, priority int, cancel bool) {
Jeromy's avatar
Jeromy committed
117
	e, exists := m.wantlist[k]
118
	if exists {
Jeromy's avatar
Jeromy committed
119 120 121
		e.Priority = priority
		e.Cancel = cancel
	} else {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
122
		m.wantlist[k] = Entry{
123 124 125 126 127
			Entry: wantlist.Entry{
				Key:      k,
				Priority: priority,
			},
			Cancel: cancel,
Jeromy's avatar
Jeromy committed
128
		}
129
	}
130 131
}

Jeromy's avatar
Jeromy committed
132
func (m *impl) AddBlock(b *blocks.Block) {
133
	m.blocks[b.Key()] = b
134 135
}

136 137 138
func FromNet(r io.Reader) (BitSwapMessage, error) {
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)

139
	pb := new(pb.Message)
140
	if err := pbr.ReadMsg(pb); err != nil {
141 142
		return nil, err
	}
143

144
	m := newMessageFromProto(*pb)
145
	return m, nil
146 147
}

148
func (m *impl) ToProto() *pb.Message {
Jeromy's avatar
Jeromy committed
149 150 151 152 153 154
	pbm := new(pb.Message)
	pbm.Wantlist = new(pb.Message_Wantlist)
	for _, e := range m.wantlist {
		pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
			Block:    proto.String(string(e.Key)),
			Priority: proto.Int32(int32(e.Priority)),
Jeromy's avatar
Jeromy committed
155
			Cancel:   proto.Bool(e.Cancel),
Jeromy's avatar
Jeromy committed
156
		})
157 158
	}
	for _, b := range m.Blocks() {
Jeromy's avatar
Jeromy committed
159
		pbm.Blocks = append(pbm.Blocks, b.Data)
160
	}
Jeromy's avatar
Jeromy committed
161
	return pbm
162 163
}

164 165 166 167 168 169 170
func (m *impl) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	if err := pbw.WriteMsg(m.ToProto()); err != nil {
		return err
	}
	return nil
171
}
172 173

func (m *impl) Loggable() map[string]interface{} {
174 175 176 177
	var blocks []string
	for _, v := range m.blocks {
		blocks = append(blocks, v.Key().Pretty())
	}
178
	return map[string]interface{}{
179 180
		"blocks": blocks,
		"wants":  m.Wantlist(),
181 182
	}
}