Unverified Commit 5754d5e3 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #12 from ipfs/feat/allocations

reduce allocations
parents b38d1d96 9093b83c
......@@ -222,7 +222,7 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
if m.Empty() {
log.Debugf("received empty message from %s", p)
}
......@@ -259,7 +259,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
} else {
// we have the block
newWorkExists = true
if msgSize + blockSize > maxMessageSize {
if msgSize+blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []*wl.Entry{}
msgSize = 0
......
......@@ -23,7 +23,7 @@ type peerRequestQueue interface {
func newPRQ() *prq {
return &prq{
taskMap: make(map[string]*peerRequestTask),
taskMap: make(map[taskEntryKey]*peerRequestTask),
partners: make(map[peer.ID]*activePartner),
frozen: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
......@@ -39,7 +39,7 @@ var _ peerRequestQueue = &prq{}
type prq struct {
lock sync.Mutex
pQueue pq.PQ
taskMap map[string]*peerRequestTask
taskMap map[taskEntryKey]*peerRequestTask
partners map[peer.ID]*activePartner
frozen map[peer.ID]*activePartner
......@@ -65,7 +65,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
if partner.activeBlocks.Has(entry.Cid) {
continue
}
if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok {
if task, ok := tl.taskMap[taskEntryKey{to, entry.Cid}]; ok {
if entry.Priority > task.Priority {
task.Priority = entry.Priority
partner.taskQueue.Update(task.index)
......@@ -98,7 +98,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
task.Priority = priority
partner.taskQueue.Push(task)
for _, entry := range newEntries {
tl.taskMap[taskEntryKey(to, entry.Cid)] = task
tl.taskMap[taskEntryKey{to, entry.Cid}] = task
}
partner.requests += len(newEntries)
tl.pQueue.Update(partner.Index())
......@@ -119,7 +119,7 @@ func (tl *prq) Pop() *peerRequestTask {
newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid))
delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
if entry.Trash {
continue
}
......@@ -143,7 +143,7 @@ func (tl *prq) Pop() *peerRequestTask {
// Remove removes a task from the queue
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskEntryKey(p, k)]
t, ok := tl.taskMap[taskEntryKey{p, k}]
if ok {
for _, entry := range t.Entries {
if entry.Cid.Equals(k) {
......@@ -220,9 +220,10 @@ func (t *peerRequestTask) SetIndex(i int) {
t.index = i
}
// taskEntryKey returns a key that uniquely identifies a task.
func taskEntryKey(p peer.ID, k cid.Cid) string {
return string(p) + k.KeyString()
// taskEntryKey is a key identifying a task.
type taskEntryKey struct {
p peer.ID
k cid.Cid
}
// FIFO is a basic task comparator that returns tasks in the order created.
......
......@@ -49,8 +49,8 @@ type Exportable interface {
type impl struct {
full bool
wantlist map[string]*Entry
blocks map[string]blocks.Block
wantlist map[cid.Cid]*Entry
blocks map[cid.Cid]blocks.Block
}
func New(full bool) BitSwapMessage {
......@@ -59,8 +59,8 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl {
return &impl{
blocks: make(map[string]blocks.Block),
wantlist: make(map[string]*Entry),
blocks: make(map[cid.Cid]blocks.Block),
wantlist: make(map[cid.Cid]*Entry),
full: full,
}
}
......@@ -71,17 +71,17 @@ type Entry struct {
}
func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
m := newMsg(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() {
c, err := cid.Cast([]byte(e.GetBlock()))
m := newMsg(pbm.Wantlist.Full)
for _, e := range pbm.Wantlist.Entries {
c, err := cid.Cast([]byte(e.Block))
if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
}
m.addEntry(c, int(e.GetPriority()), e.GetCancel())
m.addEntry(c, int(e.Priority), e.Cancel)
}
// deprecated
for _, d := range pbm.GetBlocks() {
for _, d := range pbm.Blocks {
// CIDv0, sha256, protobuf only
b := blocks.NewBlock(d)
m.AddBlock(b)
......@@ -135,7 +135,7 @@ func (m *impl) Blocks() []blocks.Block {
}
func (m *impl) Cancel(k cid.Cid) {
delete(m.wantlist, k.KeyString())
delete(m.wantlist, k)
m.addEntry(k, 0, true)
}
......@@ -144,13 +144,12 @@ func (m *impl) AddEntry(k cid.Cid, priority int) {
}
func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
k := c.KeyString()
e, exists := m.wantlist[k]
e, exists := m.wantlist[c]
if exists {
e.Priority = priority
e.Cancel = cancel
} else {
m.wantlist[k] = &Entry{
m.wantlist[c] = &Entry{
Entry: &wantlist.Entry{
Cid: c,
Priority: priority,
......@@ -161,7 +160,7 @@ func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
}
func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Cid().KeyString()] = b
m.blocks[b.Cid()] = b
}
func FromNet(r io.Reader) (BitSwapMessage, error) {
......@@ -180,10 +179,9 @@ func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
func (m *impl) ToProtoV0() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist = new(pb.Message_Wantlist)
pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{
Block: e.Cid.Bytes(),
Priority: int32(e.Priority),
Cancel: e.Cancel,
......@@ -201,10 +199,9 @@ func (m *impl) ToProtoV0() *pb.Message {
func (m *impl) ToProtoV1() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist = new(pb.Message_Wantlist)
pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{
Block: e.Cid.Bytes(),
Priority: int32(e.Priority),
Cancel: e.Cancel,
......@@ -213,13 +210,12 @@ func (m *impl) ToProtoV1() *pb.Message {
pbm.Wantlist.Full = m.full
blocks := m.Blocks()
pbm.Payload = make([]*pb.Message_Block, 0, len(blocks))
pbm.Payload = make([]pb.Message_Block, 0, len(blocks))
for _, b := range blocks {
blk := &pb.Message_Block{
pbm.Payload = append(pbm.Payload, pb.Message_Block{
Data: b.RawData(),
Prefix: b.Cid().Prefix().Bytes(),
}
pbm.Payload = append(pbm.Payload, blk)
})
}
return pbm
}
......
......@@ -20,7 +20,7 @@ func TestAppendWanted(t *testing.T) {
m := New(true)
m.AddEntry(str, 1)
if !wantlistContains(m.ToProtoV0().GetWantlist(), str) {
if !wantlistContains(&m.ToProtoV0().Wantlist, str) {
t.Fail()
}
}
......@@ -28,11 +28,10 @@ func TestAppendWanted(t *testing.T) {
func TestNewMessageFromProto(t *testing.T) {
str := mkFakeCid("a_key")
protoMessage := new(pb.Message)
protoMessage.Wantlist = new(pb.Message_Wantlist)
protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
protoMessage.Wantlist.Entries = []pb.Message_Wantlist_Entry{
{Block: str.Bytes()},
}
if !wantlistContains(protoMessage.Wantlist, str) {
if !wantlistContains(&protoMessage.Wantlist, str) {
t.Fail()
}
m, err := newMessageFromProto(*protoMessage)
......@@ -40,7 +39,7 @@ func TestNewMessageFromProto(t *testing.T) {
t.Fatal(err)
}
if !wantlistContains(m.ToProtoV0().GetWantlist(), str) {
if !wantlistContains(&m.ToProtoV0().Wantlist, str) {
t.Fail()
}
}
......@@ -94,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) {
m := New(true)
protoBeforeAppend := m.ToProtoV0()
m.AddEntry(str, 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
if wantlistContains(&protoBeforeAppend.Wantlist, str) {
t.Fail()
}
}
......@@ -121,13 +120,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal("fullness attribute got dropped on marshal")
}
keys := make(map[string]bool)
keys := make(map[cid.Cid]bool)
for _, k := range copied.Wantlist() {
keys[k.Cid.KeyString()] = true
keys[k.Cid] = true
}
for _, k := range original.Wantlist() {
if _, ok := keys[k.Cid.KeyString()]; !ok {
if _, ok := keys[k.Cid]; !ok {
t.Fatalf("Key Missing: \"%v\"", k)
}
}
......@@ -151,13 +150,13 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err)
}
keys := make(map[string]bool)
keys := make(map[cid.Cid]bool)
for _, b := range m2.Blocks() {
keys[b.Cid().KeyString()] = true
keys[b.Cid()] = true
}
for _, b := range original.Blocks() {
if _, ok := keys[b.Cid().KeyString()]; !ok {
if _, ok := keys[b.Cid()]; !ok {
t.Fail()
}
}
......
......@@ -4,7 +4,7 @@ GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<
clean:
rm -f *.pb.go
......
......@@ -6,6 +6,7 @@ package bitswap_message_pb
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import io "io"
......@@ -21,11 +22,10 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Message struct {
Wantlist *Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist,omitempty"`
Wantlist Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist"`
Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"`
Payload []*Message_Block `protobuf:"bytes,3,rep,name=payload" json:"payload,omitempty"`
Payload []Message_Block `protobuf:"bytes,3,rep,name=payload" json:"payload"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
......@@ -33,7 +33,7 @@ func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_message_1e228ff77b8fb7b4, []int{0}
return fileDescriptor_message_c28309e4affd853b, []int{0}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -62,11 +62,11 @@ func (m *Message) XXX_DiscardUnknown() {
var xxx_messageInfo_Message proto.InternalMessageInfo
func (m *Message) GetWantlist() *Message_Wantlist {
func (m *Message) GetWantlist() Message_Wantlist {
if m != nil {
return m.Wantlist
}
return nil
return Message_Wantlist{}
}
func (m *Message) GetBlocks() [][]byte {
......@@ -76,7 +76,7 @@ func (m *Message) GetBlocks() [][]byte {
return nil
}
func (m *Message) GetPayload() []*Message_Block {
func (m *Message) GetPayload() []Message_Block {
if m != nil {
return m.Payload
}
......@@ -84,10 +84,9 @@ func (m *Message) GetPayload() []*Message_Block {
}
type Message_Wantlist struct {
Entries []*Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"`
Entries []Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries"`
Full bool `protobuf:"varint,2,opt,name=full,proto3" json:"full,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
......@@ -95,7 +94,7 @@ func (m *Message_Wantlist) Reset() { *m = Message_Wantlist{} }
func (m *Message_Wantlist) String() string { return proto.CompactTextString(m) }
func (*Message_Wantlist) ProtoMessage() {}
func (*Message_Wantlist) Descriptor() ([]byte, []int) {
return fileDescriptor_message_1e228ff77b8fb7b4, []int{0, 0}
return fileDescriptor_message_c28309e4affd853b, []int{0, 0}
}
func (m *Message_Wantlist) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -124,7 +123,7 @@ func (m *Message_Wantlist) XXX_DiscardUnknown() {
var xxx_messageInfo_Message_Wantlist proto.InternalMessageInfo
func (m *Message_Wantlist) GetEntries() []*Message_Wantlist_Entry {
func (m *Message_Wantlist) GetEntries() []Message_Wantlist_Entry {
if m != nil {
return m.Entries
}
......@@ -143,7 +142,6 @@ type Message_Wantlist_Entry struct {
Priority int32 `protobuf:"varint,2,opt,name=priority,proto3" json:"priority,omitempty"`
Cancel bool `protobuf:"varint,3,opt,name=cancel,proto3" json:"cancel,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
......@@ -151,7 +149,7 @@ func (m *Message_Wantlist_Entry) Reset() { *m = Message_Wantlist_Entry{}
func (m *Message_Wantlist_Entry) String() string { return proto.CompactTextString(m) }
func (*Message_Wantlist_Entry) ProtoMessage() {}
func (*Message_Wantlist_Entry) Descriptor() ([]byte, []int) {
return fileDescriptor_message_1e228ff77b8fb7b4, []int{0, 0, 0}
return fileDescriptor_message_c28309e4affd853b, []int{0, 0, 0}
}
func (m *Message_Wantlist_Entry) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -205,7 +203,6 @@ type Message_Block struct {
Prefix []byte `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
......@@ -213,7 +210,7 @@ func (m *Message_Block) Reset() { *m = Message_Block{} }
func (m *Message_Block) String() string { return proto.CompactTextString(m) }
func (*Message_Block) ProtoMessage() {}
func (*Message_Block) Descriptor() ([]byte, []int) {
return fileDescriptor_message_1e228ff77b8fb7b4, []int{0, 1}
return fileDescriptor_message_c28309e4affd853b, []int{0, 1}
}
func (m *Message_Block) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -277,7 +274,6 @@ func (m *Message) MarshalTo(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Wantlist != nil {
dAtA[i] = 0xa
i++
i = encodeVarintMessage(dAtA, i, uint64(m.Wantlist.Size()))
......@@ -286,7 +282,6 @@ func (m *Message) MarshalTo(dAtA []byte) (int, error) {
return 0, err
}
i += n1
}
if len(m.Blocks) > 0 {
for _, b := range m.Blocks {
dAtA[i] = 0x12
......@@ -307,9 +302,6 @@ func (m *Message) MarshalTo(dAtA []byte) (int, error) {
i += n
}
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
......@@ -350,9 +342,6 @@ func (m *Message_Wantlist) MarshalTo(dAtA []byte) (int, error) {
}
i++
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
......@@ -392,9 +381,6 @@ func (m *Message_Wantlist_Entry) MarshalTo(dAtA []byte) (int, error) {
}
i++
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
......@@ -425,9 +411,6 @@ func (m *Message_Block) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintMessage(dAtA, i, uint64(len(m.Data)))
i += copy(dAtA[i:], m.Data)
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
......@@ -443,10 +426,8 @@ func encodeVarintMessage(dAtA []byte, offset int, v uint64) int {
func (m *Message) Size() (n int) {
var l int
_ = l
if m.Wantlist != nil {
l = m.Wantlist.Size()
n += 1 + l + sovMessage(uint64(l))
}
if len(m.Blocks) > 0 {
for _, b := range m.Blocks {
l = len(b)
......@@ -459,9 +440,6 @@ func (m *Message) Size() (n int) {
n += 1 + l + sovMessage(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
......@@ -477,9 +455,6 @@ func (m *Message_Wantlist) Size() (n int) {
if m.Full {
n += 2
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
......@@ -496,9 +471,6 @@ func (m *Message_Wantlist_Entry) Size() (n int) {
if m.Cancel {
n += 2
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
......@@ -513,9 +485,6 @@ func (m *Message_Block) Size() (n int) {
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
......@@ -587,9 +556,6 @@ func (m *Message) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Wantlist == nil {
m.Wantlist = &Message_Wantlist{}
}
if err := m.Wantlist.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
......@@ -649,7 +615,7 @@ func (m *Message) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Payload = append(m.Payload, &Message_Block{})
m.Payload = append(m.Payload, Message_Block{})
if err := m.Payload[len(m.Payload)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
......@@ -666,7 +632,6 @@ func (m *Message) Unmarshal(dAtA []byte) error {
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
......@@ -731,7 +696,7 @@ func (m *Message_Wantlist) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Entries = append(m.Entries, &Message_Wantlist_Entry{})
m.Entries = append(m.Entries, Message_Wantlist_Entry{})
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
......@@ -768,7 +733,6 @@ func (m *Message_Wantlist) Unmarshal(dAtA []byte) error {
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
......@@ -889,7 +853,6 @@ func (m *Message_Wantlist_Entry) Unmarshal(dAtA []byte) error {
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
......@@ -1002,7 +965,6 @@ func (m *Message_Block) Unmarshal(dAtA []byte) error {
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
......@@ -1117,26 +1079,29 @@ var (
ErrIntOverflowMessage = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("message.proto", fileDescriptor_message_1e228ff77b8fb7b4) }
func init() { proto.RegisterFile("message.proto", fileDescriptor_message_c28309e4affd853b) }
var fileDescriptor_message_1e228ff77b8fb7b4 = []byte{
// 287 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xb1, 0x4e, 0xf3, 0x30,
0x14, 0x85, 0xe5, 0xe6, 0x4f, 0x1b, 0xdd, 0xe6, 0x5f, 0x2c, 0x84, 0xac, 0x0c, 0x55, 0x40, 0x0c,
0x11, 0x83, 0x87, 0x76, 0x64, 0x41, 0x15, 0x8c, 0x0c, 0x78, 0x61, 0x76, 0x52, 0x17, 0x59, 0x98,
0x24, 0xb2, 0x8d, 0x4a, 0x9e, 0x82, 0xc7, 0xe1, 0x15, 0x18, 0x79, 0x04, 0x94, 0x27, 0x41, 0xb9,
0x75, 0xb2, 0x20, 0x21, 0xb6, 0x7b, 0xac, 0xf3, 0x1d, 0x9f, 0x6b, 0xc3, 0xff, 0x67, 0xe5, 0x9c,
0x7c, 0x54, 0xbc, 0xb5, 0x8d, 0x6f, 0x28, 0x2d, 0xb5, 0x77, 0x07, 0xd9, 0xf2, 0xe9, 0xb8, 0x3c,
0x7f, 0x8b, 0x60, 0x71, 0x77, 0x94, 0xf4, 0x1a, 0x92, 0x83, 0xac, 0xbd, 0xd1, 0xce, 0x33, 0x92,
0x93, 0x62, 0xb9, 0xbe, 0xe0, 0x3f, 0x11, 0x1e, 0xec, 0xfc, 0x21, 0x78, 0xc5, 0x44, 0xd1, 0x53,
0x98, 0x97, 0xa6, 0xa9, 0x9e, 0x1c, 0x9b, 0xe5, 0x51, 0x91, 0x8a, 0xa0, 0xe8, 0x15, 0x2c, 0x5a,
0xd9, 0x99, 0x46, 0xee, 0x58, 0x94, 0x47, 0xc5, 0x72, 0x7d, 0xf6, 0x5b, 0xf0, 0x76, 0x80, 0xc4,
0x48, 0x64, 0xef, 0x04, 0x92, 0xf1, 0x2e, 0x7a, 0x03, 0x0b, 0x55, 0x7b, 0xab, 0x95, 0x63, 0x04,
0x93, 0x2e, 0xff, 0x52, 0x91, 0xdf, 0xd6, 0xde, 0x76, 0x62, 0x44, 0x29, 0x85, 0x7f, 0xfb, 0x17,
0x63, 0xd8, 0x2c, 0x27, 0x45, 0x22, 0x70, 0xce, 0xee, 0x21, 0x46, 0x17, 0x3d, 0x81, 0x18, 0x6b,
0xe3, 0x1b, 0xa4, 0xe2, 0x28, 0x68, 0x06, 0x49, 0x6b, 0x75, 0x63, 0xb5, 0xef, 0x10, 0x8b, 0xc5,
0xa4, 0x87, 0xb5, 0x2b, 0x59, 0x57, 0xca, 0xb0, 0x08, 0x03, 0x83, 0xca, 0x36, 0x10, 0xe3, 0x2e,
0x83, 0xa1, 0xb5, 0x6a, 0xaf, 0x5f, 0x43, 0x66, 0x50, 0x43, 0x8f, 0x9d, 0xf4, 0x12, 0x03, 0x53,
0x81, 0xf3, 0x36, 0xfd, 0xe8, 0x57, 0xe4, 0xb3, 0x5f, 0x91, 0xaf, 0x7e, 0x45, 0xca, 0x39, 0x7e,
0xdd, 0xe6, 0x3b, 0x00, 0x00, 0xff, 0xff, 0xd2, 0x95, 0x9b, 0xc1, 0xcb, 0x01, 0x00, 0x00,
var fileDescriptor_message_c28309e4affd853b = []byte{
// 328 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xcf, 0x4a, 0xf3, 0x40,
0x14, 0xc5, 0x3b, 0x4d, 0xd3, 0x86, 0xdb, 0x7e, 0xf0, 0x31, 0x88, 0x84, 0x2c, 0x62, 0x14, 0x17,
0x41, 0x70, 0x0a, 0xed, 0x13, 0x58, 0xd0, 0x85, 0xe0, 0xc2, 0x6c, 0x5c, 0x4f, 0xd2, 0x34, 0x0e,
0xa6, 0x99, 0x30, 0x33, 0xa5, 0xf6, 0x2d, 0x7c, 0x05, 0x1f, 0xc4, 0x7d, 0x97, 0x3e, 0x81, 0x48,
0x7d, 0x11, 0xc9, 0xed, 0x34, 0x1b, 0x41, 0xdc, 0xdd, 0x33, 0x9c, 0xf3, 0xbb, 0x7f, 0x06, 0xfe,
0x2d, 0x73, 0xad, 0x79, 0x91, 0xb3, 0x5a, 0x49, 0x23, 0x29, 0x4d, 0x85, 0xd1, 0x6b, 0x5e, 0xb3,
0xf6, 0x39, 0x0d, 0x2e, 0x0b, 0x61, 0x1e, 0x57, 0x29, 0xcb, 0xe4, 0x72, 0x5c, 0xc8, 0x42, 0x8e,
0xd1, 0x9a, 0xae, 0x16, 0xa8, 0x50, 0x60, 0xb5, 0x47, 0x9c, 0xbd, 0x3a, 0x30, 0xb8, 0xdb, 0xa7,
0xe9, 0x0d, 0x78, 0x6b, 0x5e, 0x99, 0x52, 0x68, 0xe3, 0x93, 0x88, 0xc4, 0xc3, 0xc9, 0x39, 0xfb,
0xd9, 0x81, 0x59, 0x3b, 0x7b, 0xb0, 0xde, 0x59, 0x6f, 0xfb, 0x71, 0xd2, 0x49, 0xda, 0x2c, 0x3d,
0x86, 0x7e, 0x5a, 0xca, 0xec, 0x49, 0xfb, 0xdd, 0xc8, 0x89, 0x47, 0x89, 0x55, 0xf4, 0x0a, 0x06,
0x35, 0xdf, 0x94, 0x92, 0xcf, 0x7d, 0x27, 0x72, 0xe2, 0xe1, 0xe4, 0xf4, 0x37, 0xfc, 0xac, 0x09,
0x59, 0xf6, 0x21, 0x17, 0xbc, 0x11, 0xf0, 0x0e, 0x7d, 0xe9, 0x2d, 0x0c, 0xf2, 0xca, 0x28, 0x91,
0x6b, 0x9f, 0x20, 0xef, 0xe2, 0x2f, 0xe3, 0xb2, 0xeb, 0xca, 0xa8, 0xcd, 0x01, 0x6c, 0x01, 0x94,
0x42, 0x6f, 0xb1, 0x2a, 0x4b, 0xbf, 0x1b, 0x91, 0xd8, 0x4b, 0xb0, 0x0e, 0xee, 0xc1, 0x45, 0x2f,
0x3d, 0x02, 0x17, 0x57, 0xc0, 0xab, 0x8c, 0x92, 0xbd, 0xa0, 0x01, 0x78, 0xb5, 0x12, 0x52, 0x09,
0xb3, 0xc1, 0x98, 0x9b, 0xb4, 0xba, 0x39, 0x41, 0xc6, 0xab, 0x2c, 0x2f, 0x7d, 0x07, 0x81, 0x56,
0x05, 0x53, 0x70, 0x71, 0xaf, 0xc6, 0x50, 0xab, 0x7c, 0x21, 0x9e, 0x2d, 0xd3, 0xaa, 0x66, 0x8e,
0x39, 0x37, 0x1c, 0x81, 0xa3, 0x04, 0xeb, 0xd9, 0xff, 0xed, 0x2e, 0x24, 0xef, 0xbb, 0x90, 0x7c,
0xee, 0x42, 0xf2, 0xf2, 0x15, 0x76, 0xd2, 0x3e, 0x7e, 0xde, 0xf4, 0x3b, 0x00, 0x00, 0xff, 0xff,
0xd1, 0x6a, 0x3a, 0xa2, 0x10, 0x02, 0x00, 0x00,
}
......@@ -2,6 +2,8 @@ syntax = "proto3";
package bitswap.message.pb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message Message {
message Wantlist {
......@@ -12,7 +14,7 @@ message Message {
bool cancel = 3; // whether this revokes an entry
}
repeated Entry entries = 1; // a list of wantlist entries
repeated Entry entries = 1 [(gogoproto.nullable) = false]; // a list of wantlist entries
bool full = 2; // whether this is the full wantlist. default to false
}
......@@ -21,7 +23,7 @@ message Message {
bytes data = 2;
}
Wantlist wantlist = 1;
Wantlist wantlist = 1 [(gogoproto.nullable) = false];
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated Block payload = 3 [(gogoproto.nullable) = false]; // used to send Blocks in bitswap 1.1.0
}
......@@ -33,7 +33,7 @@ type Session struct {
interestReqs chan interestReq
interest *lru.Cache
liveWants map[string]time.Time
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
......@@ -54,7 +54,7 @@ type Session struct {
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[string]time.Time),
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
......@@ -87,8 +87,7 @@ func (bs *Bitswap) removeSession(s *Session) {
live := make([]cid.Cid, 0, len(s.liveWants))
for c := range s.liveWants {
cs, _ := cid.Cast([]byte(c))
live = append(live, cs)
live = append(live, c)
}
bs.CancelWants(live, s.id)
......@@ -147,7 +146,7 @@ func (s *Session) isLiveWant(c cid.Cid) bool {
}
func (s *Session) interestedIn(c cid.Cid) bool {
return s.interest.Contains(c.KeyString()) || s.isLiveWant(c)
return s.interest.Contains(c) || s.isLiveWant(c)
}
const provSearchDelay = time.Second * 10
......@@ -188,7 +187,7 @@ func (s *Session) run(ctx context.Context) {
s.resetTick()
case keys := <-s.newReqs:
for _, k := range keys {
s.interest.Add(k.KeyString(), nil)
s.interest.Add(k, nil)
}
if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants)
......@@ -211,8 +210,7 @@ func (s *Session) run(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
cs, _ := cid.Cast([]byte(c))
live = append(live, cs)
live = append(live, c)
s.liveWants[c] = now
}
......@@ -250,7 +248,7 @@ func (s *Session) run(ctx context.Context) {
}
func (s *Session) cidIsWanted(c cid.Cid) bool {
_, ok := s.liveWants[c.KeyString()]
_, ok := s.liveWants[c]
if !ok {
ok = s.tofetch.Has(c)
}
......@@ -261,11 +259,10 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
c := blk.Cid()
if s.cidIsWanted(c) {
ks := c.KeyString()
tval, ok := s.liveWants[ks]
tval, ok := s.liveWants[c]
if ok {
s.latTotal += time.Since(tval)
delete(s.liveWants, ks)
delete(s.liveWants, c)
} else {
s.tofetch.Remove(c)
}
......@@ -281,7 +278,7 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
s.liveWants[c.KeyString()] = now
s.liveWants[c] = now
}
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
}
......
......@@ -11,12 +11,12 @@ import (
type ThreadSafe struct {
lk sync.RWMutex
set map[string]*Entry
set map[cid.Cid]*Entry
}
// not threadsafe
type Wantlist struct {
set map[string]*Entry
set map[cid.Cid]*Entry
}
type Entry struct {
......@@ -45,13 +45,13 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit
func NewThreadSafe() *ThreadSafe {
return &ThreadSafe{
set: make(map[string]*Entry),
set: make(map[cid.Cid]*Entry),
}
}
func New() *Wantlist {
return &Wantlist{
set: make(map[string]*Entry),
set: make(map[cid.Cid]*Entry),
}
}
......@@ -66,13 +66,12 @@ func New() *Wantlist {
func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
k := c.KeyString()
if e, ok := w.set[k]; ok {
if e, ok := w.set[c]; ok {
e.SesTrk[ses] = struct{}{}
return false
}
w.set[k] = &Entry{
w.set[c] = &Entry{
Cid: c,
Priority: priority,
SesTrk: map[uint64]struct{}{ses: struct{}{}},
......@@ -85,12 +84,11 @@ func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool {
func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
if ex, ok := w.set[e.Cid]; ok {
ex.SesTrk[ses] = struct{}{}
return false
}
w.set[k] = e
w.set[e.Cid] = e
e.SesTrk[ses] = struct{}{}
return true
}
......@@ -102,15 +100,14 @@ func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
k := c.KeyString()
e, ok := w.set[k]
e, ok := w.set[c]
if !ok {
return false
}
delete(e.SesTrk, ses)
if len(e.SesTrk) == 0 {
delete(w.set, k)
delete(w.set, c)
return true
}
return false
......@@ -121,7 +118,7 @@ func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool {
func (w *ThreadSafe) Contains(k cid.Cid) (*Entry, bool) {
w.lk.RLock()
defer w.lk.RUnlock()
e, ok := w.set[k.KeyString()]
e, ok := w.set[k]
return e, ok
}
......@@ -152,12 +149,11 @@ func (w *Wantlist) Len() int {
}
func (w *Wantlist) Add(c cid.Cid, priority int) bool {
k := c.KeyString()
if _, ok := w.set[k]; ok {
if _, ok := w.set[c]; ok {
return false
}
w.set[k] = &Entry{
w.set[c] = &Entry{
Cid: c,
Priority: priority,
}
......@@ -166,27 +162,25 @@ func (w *Wantlist) Add(c cid.Cid, priority int) bool {
}
func (w *Wantlist) AddEntry(e *Entry) bool {
k := e.Cid.KeyString()
if _, ok := w.set[k]; ok {
if _, ok := w.set[e.Cid]; ok {
return false
}
w.set[k] = e
w.set[e.Cid] = e
return true
}
func (w *Wantlist) Remove(c cid.Cid) bool {
k := c.KeyString()
_, ok := w.set[k]
_, ok := w.set[c]
if !ok {
return false
}
delete(w.set, k)
delete(w.set, c)
return true
}
func (w *Wantlist) Contains(k cid.Cid) (*Entry, bool) {
e, ok := w.set[k.KeyString()]
func (w *Wantlist) Contains(c cid.Cid) (*Entry, bool) {
e, ok := w.set[c]
return e, ok
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment