Commit 06821bb3 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

fix race bugs

parent 4ba17a0f
......@@ -128,6 +128,7 @@ type Bitswap struct {
provideKeys chan u.Key
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
}
......@@ -281,10 +282,12 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
bs.wm.CancelWants(keys)
for _, block := range incoming.Blocks() {
bs.counterLk.Lock()
bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++
}
bs.counterLk.Unlock()
log.Debugf("got block %s from %s", block, p)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
......
......@@ -210,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Errorf("cancel %s", entry.Key)
log.Debugf("cancel %s", entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
log.Errorf("wants %s - %d", entry.Key, entry.Priority)
log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p)
......
......@@ -162,7 +162,7 @@ func (m *impl) ToProto() *pb.Message {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
Block: proto.String(string(e.Key)),
Priority: proto.Int32(int32(e.Priority)),
Cancel: &e.Cancel,
Cancel: proto.Bool(e.Cancel),
})
}
for _, b := range m.Blocks() {
......
......@@ -62,12 +62,10 @@ type msgQueue struct {
}
func (pm *WantManager) WantBlocks(ks []u.Key) {
log.Error("WANT: ", ks)
pm.addEntries(ks, false)
}
func (pm *WantManager) CancelWants(ks []u.Key) {
log.Error("CANCEL: ", ks)
pm.addEntries(ks, true)
}
......@@ -147,18 +145,12 @@ func (pm *WantManager) runQueue(ctx context.Context, mq *msgQueue) {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
mq.out = nil
mq.outlk.Unlock()
// no message or empty message, continue
if wlm == nil {
log.Error("nil wantlist")
continue
}
if wlm.Empty() {
log.Error("empty wantlist")
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
continue
}
mq.out = nil
mq.outlk.Unlock()
// send wantlist updates
err = pm.network.SendMessage(ctx, mq.p, wlm)
......@@ -186,22 +178,18 @@ func (pm *WantManager) Run(ctx context.Context) {
select {
case entries := <-pm.incoming:
msg := bsmsg.New()
msg.SetFull(false)
// add changes to our wantlist
for _, e := range entries {
if e.Cancel {
pm.wl.Remove(e.Key)
msg.Cancel(e.Key)
} else {
pm.wl.Add(e.Key, e.Priority)
msg.AddEntry(e.Key, e.Priority)
}
}
// broadcast those wantlist changes
for _, p := range pm.peers {
p.addMessage(msg)
p.addMessage(entries)
}
case p := <-pm.connect:
......@@ -223,7 +211,7 @@ func newMsgQueue(p peer.ID) *msgQueue {
return mq
}
func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
......@@ -233,26 +221,19 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
}
}()
if msg.Full() {
log.Error("GOt FULL MESSAGE")
}
// if we have no message held, or the one we are given is full
// overwrite the one we are holding
if mq.out == nil || msg.Full() {
mq.out = msg
return
if mq.out == nil {
mq.out = bsmsg.New()
}
// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range msg.Wantlist() {
for _, e := range entries {
if e.Cancel {
log.Error("add message cancel: ", e.Key, mq.p)
mq.out.Cancel(e.Key)
} else {
log.Error("add message want: ", e.Key, mq.p)
mq.out.AddEntry(e.Key, e.Priority)
}
}
......
......@@ -17,8 +17,10 @@ func (bs *Bitswap) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
st.Wantlist = bs.GetWantlist()
bs.counterLk.Lock()
st.BlocksReceived = bs.blocksRecvd
st.DupBlksReceived = bs.dupBlocksRecvd
bs.counterLk.Unlock()
for _, p := range bs.engine.Peers() {
st.Peers = append(st.Peers, p.Pretty())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment