Commit 6315475e authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #1166 from ipfs/fix/bitswap-multisend

try harder to not send duplicate blocks
parents 6e4bb2aa 110eef1d
......@@ -127,6 +127,9 @@ type Bitswap struct {
newBlocks chan *blocks.Block
provideKeys chan u.Key
blocksRecvd int
dupBlocksRecvd int
}
type blockRequest struct {
......@@ -219,6 +222,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return errors.New("bitswap is closed")
default:
}
if err := bs.blockstore.Put(blk); err != nil {
return err
}
......@@ -342,6 +346,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// Should only track *useful* messages in ledger
for _, block := range incoming.Blocks() {
bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++
}
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Debug(err)
......
......@@ -69,9 +69,6 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := g.Next()
defer hasBlock.Exchange.Close()
if err := hasBlock.Blockstore().Put(block); err != nil {
t.Fatal(err)
}
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
......@@ -136,7 +133,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
var blkeys []u.Key
first := instances[0]
for _, b := range blocks {
first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
blkeys = append(blkeys, b.Key())
first.Exchange.HasBlock(context.Background(), b)
}
......@@ -144,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Log("Distribute!")
wg := sync.WaitGroup{}
for _, inst := range instances {
for _, inst := range instances[1:] {
wg.Add(1)
go func(inst Instance) {
defer wg.Done()
......
......@@ -46,7 +46,7 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
if !ok {
partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))}
partner = newActivePartner()
tl.pQueue.Push(partner)
tl.partners[to] = partner
}
......@@ -57,12 +57,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
return
}
partner.activelk.Lock()
defer partner.activelk.Unlock()
_, ok = partner.activeBlocks[entry.Key]
if ok {
return
}
task := &peerRequestTask{
Entry: entry,
Target: to,
created: time.Now(),
Done: func() {
partner.TaskDone()
partner.TaskDone(entry.Key)
tl.lock.Lock()
tl.pQueue.Update(partner.Index())
tl.lock.Unlock()
......@@ -93,7 +100,7 @@ func (tl *prq) Pop() *peerRequestTask {
continue // discarding tasks that have been removed
}
partner.StartTask()
partner.StartTask(out.Entry.Key)
partner.requests--
break // and return |out|
}
......@@ -179,6 +186,8 @@ type activePartner struct {
activelk sync.Mutex
active int
activeBlocks map[u.Key]struct{}
// requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
// the peerRequestQueue's locks
......@@ -191,6 +200,13 @@ type activePartner struct {
taskQueue pq.PQ
}
func newActivePartner() *activePartner {
return &activePartner{
taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: make(map[u.Key]struct{}),
}
}
// partnerCompare implements pq.ElemComparator
func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner)
......@@ -208,15 +224,17 @@ func partnerCompare(a, b pq.Elem) bool {
}
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask() {
func (p *activePartner) StartTask(k u.Key) {
p.activelk.Lock()
p.activeBlocks[k] = struct{}{}
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone() {
func (p *activePartner) TaskDone(k u.Key) {
p.activelk.Lock()
delete(p.activeBlocks, k)
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
......
......@@ -6,15 +6,19 @@ import (
)
type Stat struct {
ProvideBufLen int
Wantlist []u.Key
Peers []string
ProvideBufLen int
Wantlist []u.Key
Peers []string
BlocksReceived int
DupBlksReceived int
}
func (bs *Bitswap) Stat() (*Stat, error) {
st := new(Stat)
st.ProvideBufLen = len(bs.newBlocks)
st.Wantlist = bs.GetWantlist()
st.BlocksReceived = bs.blocksRecvd
st.DupBlksReceived = bs.dupBlocksRecvd
for _, p := range bs.engine.Peers() {
st.Peers = append(st.Peers, p.Pretty())
......
......@@ -11,7 +11,7 @@ import (
u "github.com/ipfs/go-ipfs/util"
)
var TaskWorkerCount = 16
var TaskWorkerCount = 8
func init() {
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment