Commit d6092eb5 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3182 from ipfs/fix/bitswap/want-cancel

Remove entries from wantlists when their related requests are cancelled
parents 1f387af2 1548c8aa
......@@ -22,7 +22,6 @@ import (
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
......@@ -88,7 +87,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize),
......@@ -131,7 +130,7 @@ type Bitswap struct {
notifications notifications.PubSub
// send keys to a worker to find and connect to providers for them
findKeys chan *wantlist.Entry
findKeys chan *blockRequest
engine *decision.Engine
......@@ -148,8 +147,8 @@ type Bitswap struct {
}
type blockRequest struct {
key key.Key
ctx context.Context
Key key.Key
Ctx context.Context
}
// GetBlock attempts to retrieve a particular block from peers within the
......@@ -235,13 +234,50 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &wantlist.Entry{
req := &blockRequest{
Key: keys[0],
Ctx: ctx,
}
remaining := make(map[key.Key]struct{})
for _, k := range keys {
remaining[k] = struct{}{}
}
out := make(chan blocks.Block)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(out)
defer func() {
var toCancel []key.Key
for k, _ := range remaining {
toCancel = append(toCancel, k)
}
bs.CancelWants(toCancel)
}()
for {
select {
case blk, ok := <-promise:
if !ok {
return
}
delete(remaining, blk.Key())
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
select {
case bs.findKeys <- req:
return promise, nil
return out, nil
case <-ctx.Done():
return nil, ctx.Err()
}
......
......@@ -334,7 +334,6 @@ func TestDoubleGet(t *testing.T) {
blocks := bg.Blocks(1)
ctx1, cancel1 := context.WithCancel(context.Background())
blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
if err != nil {
t.Fatal(err)
......@@ -362,11 +361,15 @@ func TestDoubleGet(t *testing.T) {
t.Fatal(err)
}
blk, ok := <-blkch2
if !ok {
t.Fatal("expected to get the block here")
select {
case blk, ok := <-blkch2:
if !ok {
t.Fatal("expected to get the block here")
}
t.Log(blk)
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting on block")
}
t.Log(blk)
for _, inst := range instances {
err := inst.Exchange.Close()
......@@ -375,3 +378,68 @@ func TestDoubleGet(t *testing.T) {
}
}
}
func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
instances := sg.Instances(1)[0]
bswap := instances.Exchange
blocks := bg.Blocks(20)
var keys []key.Key
for _, b := range blocks {
keys = append(keys, b.Key())
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := bswap.GetBlock(ctx, keys[0])
if err != context.DeadlineExceeded {
t.Fatal("shouldnt have fetched any blocks")
}
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) > 0 {
t.Fatal("should not have anyting in wantlist")
}
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err = bswap.GetBlocks(ctx, keys[:10])
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) > 0 {
t.Fatal("should not have anyting in wantlist")
}
_, err = bswap.GetBlocks(context.Background(), keys[:1])
if err != nil {
t.Fatal(err)
}
ctx, cancel = context.WithCancel(context.Background())
_, err = bswap.GetBlocks(ctx, keys[10:])
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) != 11 {
t.Fatal("should have 11 keys in wantlist")
}
cancel()
time.Sleep(time.Millisecond * 50)
if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
t.Fatal("should only have keys[0] in wantlist")
}
}
......@@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
}
}
......@@ -104,7 +104,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
return e
}
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
e.lock.Lock()
partner, ok := e.ledgerMap[p]
if ok {
......@@ -218,7 +218,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("cancel %s", entry.Key)
log.Debugf("%s cancel %s", p, entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
......
......@@ -79,7 +79,7 @@ func (l *ledger) CancelWant(k key.Key) {
l.wantList.Remove(k)
}
func (l *ledger) WantListContains(k key.Key) (wl.Entry, bool) {
func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) {
return l.wantList.Contains(k)
}
......
......@@ -13,7 +13,7 @@ import (
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID)
Push(entry *wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements
......@@ -45,7 +45,7 @@ type prq struct {
}
// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
tl.lock.Lock()
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
......@@ -166,7 +166,7 @@ func (tl *prq) thawRound() {
}
type peerRequestTask struct {
Entry wantlist.Entry
Entry *wantlist.Entry
Target peer.ID
// A callback to signal that this task has been completed
......
......@@ -41,7 +41,7 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
prq.Push(wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
}
for _, consonant := range consonants {
prq.Remove(key.Key(consonant), partner)
......@@ -78,10 +78,10 @@ func TestPeerRepeats(t *testing.T) {
// Have each push some blocks
for i := 0; i < 5; i++ {
prq.Push(wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(wantlist.Entry{Key: key.Key(i)}, d)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, d)
}
// now, pop off four entries, there should be one from each
......
......@@ -64,7 +64,7 @@ func newMsg(full bool) *impl {
}
type Entry struct {
wantlist.Entry
*wantlist.Entry
Cancel bool
}
......@@ -120,7 +120,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
e.Cancel = cancel
} else {
m.wantlist[k] = Entry{
Entry: wantlist.Entry{
Entry: &wantlist.Entry{
Key: k,
Priority: priority,
},
......
......@@ -7,8 +7,6 @@ import (
"sync"
key "github.com/ipfs/go-ipfs/blocks/key"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
type ThreadSafe struct {
......@@ -18,19 +16,17 @@ type ThreadSafe struct {
// not threadsafe
type Wantlist struct {
set map[key.Key]Entry
set map[key.Key]*Entry
}
type Entry struct {
Key key.Key
Priority int
Ctx context.Context
cancel func()
RefCnt int
}
type entrySlice []Entry
type entrySlice []*Entry
func (es entrySlice) Len() int { return len(es) }
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
......@@ -44,41 +40,41 @@ func NewThreadSafe() *ThreadSafe {
func New() *Wantlist {
return &Wantlist{
set: make(map[key.Key]Entry),
set: make(map[key.Key]*Entry),
}
}
func (w *ThreadSafe) Add(k key.Key, priority int) {
func (w *ThreadSafe) Add(k key.Key, priority int) bool {
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.Add(k, priority)
return w.Wantlist.Add(k, priority)
}
func (w *ThreadSafe) AddEntry(e Entry) {
func (w *ThreadSafe) AddEntry(e *Entry) bool {
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.AddEntry(e)
return w.Wantlist.AddEntry(e)
}
func (w *ThreadSafe) Remove(k key.Key) {
func (w *ThreadSafe) Remove(k key.Key) bool {
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.Remove(k)
return w.Wantlist.Remove(k)
}
func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
func (w *ThreadSafe) Contains(k key.Key) (*Entry, bool) {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Contains(k)
}
func (w *ThreadSafe) Entries() []Entry {
func (w *ThreadSafe) Entries() []*Entry {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Entries()
}
func (w *ThreadSafe) SortedEntries() []Entry {
func (w *ThreadSafe) SortedEntries() []*Entry {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.SortedEntries()
......@@ -94,50 +90,50 @@ func (w *Wantlist) Len() int {
return len(w.set)
}
func (w *Wantlist) Add(k key.Key, priority int) {
func (w *Wantlist) Add(k key.Key, priority int) bool {
if e, ok := w.set[k]; ok {
e.RefCnt++
return
return false
}
ctx, cancel := context.WithCancel(context.Background())
w.set[k] = Entry{
w.set[k] = &Entry{
Key: k,
Priority: priority,
Ctx: ctx,
cancel: cancel,
RefCnt: 1,
}
return true
}
func (w *Wantlist) AddEntry(e Entry) {
if _, ok := w.set[e.Key]; ok {
return
func (w *Wantlist) AddEntry(e *Entry) bool {
if ex, ok := w.set[e.Key]; ok {
ex.RefCnt++
return false
}
w.set[e.Key] = e
return true
}
func (w *Wantlist) Remove(k key.Key) {
func (w *Wantlist) Remove(k key.Key) bool {
e, ok := w.set[k]
if !ok {
return
return false
}
e.RefCnt--
if e.RefCnt <= 0 {
delete(w.set, k)
if e.cancel != nil {
e.cancel()
}
return true
}
return false
}
func (w *Wantlist) Contains(k key.Key) (Entry, bool) {
func (w *Wantlist) Contains(k key.Key) (*Entry, bool) {
e, ok := w.set[k]
return e, ok
}
func (w *Wantlist) Entries() []Entry {
func (w *Wantlist) Entries() []*Entry {
var es entrySlice
for _, e := range w.set {
es = append(es, e)
......@@ -145,7 +141,7 @@ func (w *Wantlist) Entries() []Entry {
return es
}
func (w *Wantlist) SortedEntries() []Entry {
func (w *Wantlist) SortedEntries() []*Entry {
var es entrySlice
for _, e := range w.set {
es = append(es, e)
......
......@@ -75,6 +75,7 @@ func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
}
func (pm *WantManager) CancelWants(ks []key.Key) {
log.Infof("cancel wants: %s", ks)
pm.addEntries(context.TODO(), ks, true)
}
......@@ -83,16 +84,17 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: wantlist.Entry{
Entry: &wantlist.Entry{
Key: k,
Priority: kMaxPriority - i,
Ctx: ctx,
RefCnt: 1,
},
})
}
select {
case pm.incoming <- entries:
case <-pm.ctx.Done():
case <-ctx.Done():
}
}
......@@ -241,33 +243,31 @@ func (pm *WantManager) Run() {
case entries := <-pm.incoming:
// add changes to our wantlist
var filtered []*bsmsg.Entry
for _, e := range entries {
if e.Cancel {
pm.wl.Remove(e.Key)
if pm.wl.Remove(e.Key) {
filtered = append(filtered, e)
}
} else {
pm.wl.AddEntry(e.Entry)
if pm.wl.AddEntry(e.Entry) {
filtered = append(filtered, e)
}
}
}
// broadcast those wantlist changes
for _, p := range pm.peers {
p.addMessage(entries)
p.addMessage(filtered)
}
case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
select {
case <-e.Ctx.Done():
// entry has been cancelled
// simply continue, the entry will be removed from the
// wantlist soon enough
continue
default:
}
es = append(es, &bsmsg.Entry{Entry: e})
}
for _, p := range pm.peers {
p.outlk.Lock()
p.out = bsmsg.New(true)
......
......@@ -9,7 +9,6 @@ import (
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
)
......@@ -172,10 +171,19 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
}
case <-broadcastSignal.C: // resend unfulfilled wantlist keys
log.Event(ctx, "Bitswap.Rebroadcast.active")
entries := bs.wm.wl.Entries()
if len(entries) == 0 {
continue
}
tctx, cancel := context.WithTimeout(ctx, providerRequestTimeout)
for _, e := range bs.wm.wl.Entries() {
e := e
bs.findKeys <- &e
bs.findKeys <- &blockRequest{
Key: e.Key,
Ctx: tctx,
}
}
cancel()
case <-parent.Done():
return
}
......@@ -184,20 +192,20 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
func (bs *Bitswap) providerQueryManager(ctx context.Context) {
var activeLk sync.Mutex
active := make(map[key.Key]*wantlist.Entry)
kset := key.NewKeySet()
for {
select {
case e := <-bs.findKeys:
activeLk.Lock()
if _, ok := active[e.Key]; ok {
if kset.Has(e.Key) {
activeLk.Unlock()
continue
}
active[e.Key] = e
kset.Add(e.Key)
activeLk.Unlock()
go func(e *wantlist.Entry) {
go func(e *blockRequest) {
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
......@@ -210,7 +218,7 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
}(p)
}
activeLk.Lock()
delete(active, e.Key)
kset.Remove(e.Key)
activeLk.Unlock()
}(e)
......
......@@ -11,13 +11,6 @@ test_description="test bitswap commands"
test_init_ipfs
test_launch_ipfs_daemon
test_expect_success "'ipfs block get' adds hash to wantlist" '
export NONEXIST=QmeXxaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa &&
test_expect_code 1 ipfs block get $NONEXIST --timeout=10ms &&
ipfs bitswap wantlist >wantlist_out &&
grep $NONEXIST wantlist_out
'
test_expect_success "'ipfs bitswap stat' succeeds" '
ipfs bitswap stat >stat_out
'
......@@ -29,8 +22,7 @@ bitswap status
blocks received: 0
dup blocks received: 0
dup data received: 0 B
wantlist [1 keys]
$NONEXIST
wantlist [0 keys]
partners [0]
EOF
test_cmp expected stat_out
......@@ -45,12 +37,8 @@ test_expect_success "'ipfs bitswap wantlist -p' works" '
ipfs bitswap wantlist -p "$PEERID" >wantlist_p_out
'
test_expect_failure "'ipfs bitswap wantlist -p' output looks good" '
test_cmp wantlist_out wantlist_p_out
'
test_expect_success "'ipfs bitswap unwant' succeeds" '
ipfs bitswap unwant $NONEXIST
test_expect_success "'ipfs bitswap wantlist -p' output looks good" '
test_must_be_empty wantlist_p_out
'
test_expect_success "hash was removed from wantlist" '
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment