Commit a53c0055 authored by Jeromy's avatar Jeromy

bitswap: clear wantlists when GetBlocks calls are cancelled

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent ff209853
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
flags "github.com/ipfs/go-ipfs/flags" flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay" "github.com/ipfs/go-ipfs/thirdparty/delay"
loggables "github.com/ipfs/go-ipfs/thirdparty/loggables" loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
...@@ -88,7 +87,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, ...@@ -88,7 +87,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif, notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network, network: network,
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan), findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan blocks.Block, HasBlockBufferSize), newBlocks: make(chan blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize),
...@@ -131,7 +130,7 @@ type Bitswap struct { ...@@ -131,7 +130,7 @@ type Bitswap struct {
notifications notifications.PubSub notifications notifications.PubSub
// send keys to a worker to find and connect to providers for them // send keys to a worker to find and connect to providers for them
findKeys chan *wantlist.Entry findKeys chan *blockRequest
engine *decision.Engine engine *decision.Engine
...@@ -148,8 +147,8 @@ type Bitswap struct { ...@@ -148,8 +147,8 @@ type Bitswap struct {
} }
type blockRequest struct { type blockRequest struct {
key key.Key Key key.Key
ctx context.Context Ctx context.Context
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
...@@ -235,13 +234,50 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks ...@@ -235,13 +234,50 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
// NB: Optimization. Assumes that providers of key[0] are likely to // NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most // be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true. // every situation. Later, this assumption may not hold as true.
req := &wantlist.Entry{ req := &blockRequest{
Key: keys[0], Key: keys[0],
Ctx: ctx, Ctx: ctx,
} }
remaining := make(map[key.Key]struct{})
for _, k := range keys {
remaining[k] = struct{}{}
}
out := make(chan blocks.Block)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(out)
defer func() {
var toCancel []key.Key
for k, _ := range remaining {
toCancel = append(toCancel, k)
}
bs.CancelWants(toCancel)
}()
for {
select {
case blk, ok := <-promise:
if !ok {
return
}
delete(remaining, blk.Key())
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
select { select {
case bs.findKeys <- req: case bs.findKeys <- req:
return promise, nil return out, nil
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
} }
......
...@@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) { ...@@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
} }
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
q.Push(wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)]) q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
} }
} }
...@@ -104,7 +104,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ...@@ -104,7 +104,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
return e return e
} }
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) { func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
e.lock.Lock() e.lock.Lock()
partner, ok := e.ledgerMap[p] partner, ok := e.ledgerMap[p]
if ok { if ok {
...@@ -218,7 +218,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { ...@@ -218,7 +218,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() { for _, entry := range m.Wantlist() {
if entry.Cancel { if entry.Cancel {
log.Debugf("cancel %s", entry.Key) log.Debugf("%s cancel %s", p, entry.Key)
l.CancelWant(entry.Key) l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p) e.peerRequestQueue.Remove(entry.Key, p)
} else { } else {
......
...@@ -79,7 +79,7 @@ func (l *ledger) CancelWant(k key.Key) { ...@@ -79,7 +79,7 @@ func (l *ledger) CancelWant(k key.Key) {
l.wantList.Remove(k) l.wantList.Remove(k)
} }
func (l *ledger) WantListContains(k key.Key) (wl.Entry, bool) { func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) {
return l.wantList.Contains(k) return l.wantList.Contains(k)
} }
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
type peerRequestQueue interface { type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty. // Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID) Push(entry *wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID) Remove(k key.Key, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements // NB: cannot expose simply expose taskQueue.Len because trashed elements
...@@ -45,7 +45,7 @@ type prq struct { ...@@ -45,7 +45,7 @@ type prq struct {
} }
// Push currently adds a new peerRequestTask to the end of the list // Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
tl.lock.Lock() tl.lock.Lock()
defer tl.lock.Unlock() defer tl.lock.Unlock()
partner, ok := tl.partners[to] partner, ok := tl.partners[to]
...@@ -166,7 +166,7 @@ func (tl *prq) thawRound() { ...@@ -166,7 +166,7 @@ func (tl *prq) thawRound() {
} }
type peerRequestTask struct { type peerRequestTask struct {
Entry wantlist.Entry Entry *wantlist.Entry
Target peer.ID Target peer.ID
// A callback to signal that this task has been completed // A callback to signal that this task has been completed
......
...@@ -41,7 +41,7 @@ func TestPushPop(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index] letter := alphabet[index]
t.Log(partner.String()) t.Log(partner.String())
prq.Push(wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner) prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
} }
for _, consonant := range consonants { for _, consonant := range consonants {
prq.Remove(key.Key(consonant), partner) prq.Remove(key.Key(consonant), partner)
...@@ -78,10 +78,10 @@ func TestPeerRepeats(t *testing.T) { ...@@ -78,10 +78,10 @@ func TestPeerRepeats(t *testing.T) {
// Have each push some blocks // Have each push some blocks
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
prq.Push(wantlist.Entry{Key: key.Key(i)}, a) prq.Push(&wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(wantlist.Entry{Key: key.Key(i)}, b) prq.Push(&wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(wantlist.Entry{Key: key.Key(i)}, c) prq.Push(&wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(wantlist.Entry{Key: key.Key(i)}, d) prq.Push(&wantlist.Entry{Key: key.Key(i)}, d)
} }
// now, pop off four entries, there should be one from each // now, pop off four entries, there should be one from each
......
...@@ -64,7 +64,7 @@ func newMsg(full bool) *impl { ...@@ -64,7 +64,7 @@ func newMsg(full bool) *impl {
} }
type Entry struct { type Entry struct {
wantlist.Entry *wantlist.Entry
Cancel bool Cancel bool
} }
...@@ -120,7 +120,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) { ...@@ -120,7 +120,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
e.Cancel = cancel e.Cancel = cancel
} else { } else {
m.wantlist[k] = Entry{ m.wantlist[k] = Entry{
Entry: wantlist.Entry{ Entry: &wantlist.Entry{
Key: k, Key: k,
Priority: priority, Priority: priority,
}, },
......
...@@ -7,8 +7,6 @@ import ( ...@@ -7,8 +7,6 @@ import (
"sync" "sync"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
type ThreadSafe struct { type ThreadSafe struct {
...@@ -18,19 +16,17 @@ type ThreadSafe struct { ...@@ -18,19 +16,17 @@ type ThreadSafe struct {
// not threadsafe // not threadsafe
type Wantlist struct { type Wantlist struct {
set map[key.Key]Entry set map[key.Key]*Entry
} }
type Entry struct { type Entry struct {
Key key.Key Key key.Key
Priority int Priority int
Ctx context.Context
cancel func()
RefCnt int RefCnt int
} }
type entrySlice []Entry type entrySlice []*Entry
func (es entrySlice) Len() int { return len(es) } func (es entrySlice) Len() int { return len(es) }
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
...@@ -44,41 +40,41 @@ func NewThreadSafe() *ThreadSafe { ...@@ -44,41 +40,41 @@ func NewThreadSafe() *ThreadSafe {
func New() *Wantlist { func New() *Wantlist {
return &Wantlist{ return &Wantlist{
set: make(map[key.Key]Entry), set: make(map[key.Key]*Entry),
} }
} }
func (w *ThreadSafe) Add(k key.Key, priority int) { func (w *ThreadSafe) Add(k key.Key, priority int) bool {
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.Add(k, priority) return w.Wantlist.Add(k, priority)
} }
func (w *ThreadSafe) AddEntry(e Entry) { func (w *ThreadSafe) AddEntry(e *Entry) bool {
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.AddEntry(e) return w.Wantlist.AddEntry(e)
} }
func (w *ThreadSafe) Remove(k key.Key) { func (w *ThreadSafe) Remove(k key.Key) bool {
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.Remove(k) return w.Wantlist.Remove(k)
} }
func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) { func (w *ThreadSafe) Contains(k key.Key) (*Entry, bool) {
w.lk.RLock() w.lk.RLock()
defer w.lk.RUnlock() defer w.lk.RUnlock()
return w.Wantlist.Contains(k) return w.Wantlist.Contains(k)
} }
func (w *ThreadSafe) Entries() []Entry { func (w *ThreadSafe) Entries() []*Entry {
w.lk.RLock() w.lk.RLock()
defer w.lk.RUnlock() defer w.lk.RUnlock()
return w.Wantlist.Entries() return w.Wantlist.Entries()
} }
func (w *ThreadSafe) SortedEntries() []Entry { func (w *ThreadSafe) SortedEntries() []*Entry {
w.lk.RLock() w.lk.RLock()
defer w.lk.RUnlock() defer w.lk.RUnlock()
return w.Wantlist.SortedEntries() return w.Wantlist.SortedEntries()
...@@ -94,50 +90,50 @@ func (w *Wantlist) Len() int { ...@@ -94,50 +90,50 @@ func (w *Wantlist) Len() int {
return len(w.set) return len(w.set)
} }
func (w *Wantlist) Add(k key.Key, priority int) { func (w *Wantlist) Add(k key.Key, priority int) bool {
if e, ok := w.set[k]; ok { if e, ok := w.set[k]; ok {
e.RefCnt++ e.RefCnt++
return return false
} }
ctx, cancel := context.WithCancel(context.Background()) w.set[k] = &Entry{
w.set[k] = Entry{
Key: k, Key: k,
Priority: priority, Priority: priority,
Ctx: ctx,
cancel: cancel,
RefCnt: 1, RefCnt: 1,
} }
return true
} }
func (w *Wantlist) AddEntry(e Entry) { func (w *Wantlist) AddEntry(e *Entry) bool {
if _, ok := w.set[e.Key]; ok { if ex, ok := w.set[e.Key]; ok {
return ex.RefCnt++
return false
} }
w.set[e.Key] = e w.set[e.Key] = e
return true
} }
func (w *Wantlist) Remove(k key.Key) { func (w *Wantlist) Remove(k key.Key) bool {
e, ok := w.set[k] e, ok := w.set[k]
if !ok { if !ok {
return return false
} }
e.RefCnt-- e.RefCnt--
if e.RefCnt <= 0 { if e.RefCnt <= 0 {
delete(w.set, k) delete(w.set, k)
if e.cancel != nil { return true
e.cancel()
}
} }
return false
} }
func (w *Wantlist) Contains(k key.Key) (Entry, bool) { func (w *Wantlist) Contains(k key.Key) (*Entry, bool) {
e, ok := w.set[k] e, ok := w.set[k]
return e, ok return e, ok
} }
func (w *Wantlist) Entries() []Entry { func (w *Wantlist) Entries() []*Entry {
var es entrySlice var es entrySlice
for _, e := range w.set { for _, e := range w.set {
es = append(es, e) es = append(es, e)
...@@ -145,7 +141,7 @@ func (w *Wantlist) Entries() []Entry { ...@@ -145,7 +141,7 @@ func (w *Wantlist) Entries() []Entry {
return es return es
} }
func (w *Wantlist) SortedEntries() []Entry { func (w *Wantlist) SortedEntries() []*Entry {
var es entrySlice var es entrySlice
for _, e := range w.set { for _, e := range w.set {
es = append(es, e) es = append(es, e)
......
...@@ -75,6 +75,7 @@ func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) { ...@@ -75,6 +75,7 @@ func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
} }
func (pm *WantManager) CancelWants(ks []key.Key) { func (pm *WantManager) CancelWants(ks []key.Key) {
log.Infof("cancel wants: %s", ks)
pm.addEntries(context.TODO(), ks, true) pm.addEntries(context.TODO(), ks, true)
} }
...@@ -83,16 +84,17 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool ...@@ -83,16 +84,17 @@ func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool
for i, k := range ks { for i, k := range ks {
entries = append(entries, &bsmsg.Entry{ entries = append(entries, &bsmsg.Entry{
Cancel: cancel, Cancel: cancel,
Entry: wantlist.Entry{ Entry: &wantlist.Entry{
Key: k, Key: k,
Priority: kMaxPriority - i, Priority: kMaxPriority - i,
Ctx: ctx, RefCnt: 1,
}, },
}) })
} }
select { select {
case pm.incoming <- entries: case pm.incoming <- entries:
case <-pm.ctx.Done(): case <-pm.ctx.Done():
case <-ctx.Done():
} }
} }
...@@ -241,33 +243,31 @@ func (pm *WantManager) Run() { ...@@ -241,33 +243,31 @@ func (pm *WantManager) Run() {
case entries := <-pm.incoming: case entries := <-pm.incoming:
// add changes to our wantlist // add changes to our wantlist
var filtered []*bsmsg.Entry
for _, e := range entries { for _, e := range entries {
if e.Cancel { if e.Cancel {
pm.wl.Remove(e.Key) if pm.wl.Remove(e.Key) {
filtered = append(filtered, e)
}
} else { } else {
pm.wl.AddEntry(e.Entry) if pm.wl.AddEntry(e.Entry) {
filtered = append(filtered, e)
}
} }
} }
// broadcast those wantlist changes // broadcast those wantlist changes
for _, p := range pm.peers { for _, p := range pm.peers {
p.addMessage(entries) p.addMessage(filtered)
} }
case <-tock.C: case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() { for _, e := range pm.wl.Entries() {
select {
case <-e.Ctx.Done():
// entry has been cancelled
// simply continue, the entry will be removed from the
// wantlist soon enough
continue
default:
}
es = append(es, &bsmsg.Entry{Entry: e}) es = append(es, &bsmsg.Entry{Entry: e})
} }
for _, p := range pm.peers { for _, p := range pm.peers {
p.outlk.Lock() p.outlk.Lock()
p.out = bsmsg.New(true) p.out = bsmsg.New(true)
......
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log" logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer" peer "gx/ipfs/QmRBqJF7hb8ZSpRcMwUt8hNhydWcxGEhtk81HKq6oUwKvs/go-libp2p-peer"
) )
...@@ -172,10 +171,19 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -172,10 +171,19 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
} }
case <-broadcastSignal.C: // resend unfulfilled wantlist keys case <-broadcastSignal.C: // resend unfulfilled wantlist keys
log.Event(ctx, "Bitswap.Rebroadcast.active") log.Event(ctx, "Bitswap.Rebroadcast.active")
entries := bs.wm.wl.Entries()
if len(entries) == 0 {
continue
}
tctx, cancel := context.WithTimeout(ctx, providerRequestTimeout)
for _, e := range bs.wm.wl.Entries() { for _, e := range bs.wm.wl.Entries() {
e := e e := e
bs.findKeys <- &e bs.findKeys <- &blockRequest{
Key: e.Key,
Ctx: tctx,
}
} }
cancel()
case <-parent.Done(): case <-parent.Done():
return return
} }
...@@ -184,20 +192,20 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -184,20 +192,20 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
func (bs *Bitswap) providerQueryManager(ctx context.Context) { func (bs *Bitswap) providerQueryManager(ctx context.Context) {
var activeLk sync.Mutex var activeLk sync.Mutex
active := make(map[key.Key]*wantlist.Entry) kset := key.NewKeySet()
for { for {
select { select {
case e := <-bs.findKeys: case e := <-bs.findKeys:
activeLk.Lock() activeLk.Lock()
if _, ok := active[e.Key]; ok { if kset.Has(e.Key) {
activeLk.Unlock() activeLk.Unlock()
continue continue
} }
active[e.Key] = e kset.Add(e.Key)
activeLk.Unlock() activeLk.Unlock()
go func(e *wantlist.Entry) { go func(e *blockRequest) {
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout) child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
defer cancel() defer cancel()
providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest) providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
...@@ -210,7 +218,7 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) { ...@@ -210,7 +218,7 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
}(p) }(p)
} }
activeLk.Lock() activeLk.Lock()
delete(active, e.Key) kset.Remove(e.Key)
activeLk.Unlock() activeLk.Unlock()
}(e) }(e)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment