Commit 9ec351de authored by Jeromy's avatar Jeromy

rework how refcounted wantlists work

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 3538a30e
......@@ -169,6 +169,9 @@ type Bitswap struct {
// Sessions
sessions []*Session
sessLk sync.Mutex
sessID uint64
sessIDLk sync.Mutex
}
type blockRequest struct {
......@@ -219,7 +222,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}
bs.wm.WantBlocks(ctx, keys, nil)
mses := bs.getNextSessionID()
bs.wm.WantBlocks(ctx, keys, nil, mses)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
......@@ -241,7 +246,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
defer close(out)
defer func() {
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs.CancelWants(remaining.Keys())
bs.CancelWants(remaining.Keys(), mses)
}()
for {
select {
......@@ -250,6 +255,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
return
}
bs.CancelWants([]*cid.Cid{blk.Cid()}, mses)
remaining.Remove(blk.Cid())
select {
case out <- blk:
......@@ -270,9 +276,16 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block
}
}
func (bs *Bitswap) getNextSessionID() uint64 {
bs.sessIDLk.Lock()
defer bs.sessIDLk.Unlock()
bs.sessID++
return bs.sessID
}
// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []*cid.Cid) {
bs.wm.CancelWants(context.Background(), cids, nil)
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) {
bs.wm.CancelWants(context.Background(), cids, nil, ses)
}
// HasBlock announces the existance of a block to this bitswap service. The
......@@ -314,7 +327,7 @@ func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session {
var out []*Session
for _, s := range bs.sessions {
if s.InterestedIn(c) {
if s.interestedIn(c) {
out = append(out, s)
}
}
......@@ -346,8 +359,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
keys = append(keys, block.Cid())
}
bs.wm.CancelWants(context.Background(), keys, nil)
wg := sync.WaitGroup{}
for _, block := range iblocks {
wg.Add(1)
......@@ -360,7 +371,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Event(ctx, "Bitswap.GetBlockRequest.End", k)
for _, ses := range bs.SessionsForBlock(k) {
ses.ReceiveBlock(p, b)
ses.receiveBlockFrom(p, b)
bs.CancelWants([]*cid.Cid{k}, ses.id)
}
log.Debugf("got block %s from %s", b, p)
if err := bs.HasBlock(b); err != nil {
......
......@@ -332,6 +332,11 @@ func TestBasicBitswap(t *testing.T) {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 20)
if len(instances[1].Exchange.GetWantlist()) != 0 {
t.Fatal("shouldnt have anything in wantlist")
}
st0, err := instances[0].Exchange.Stat()
if err != nil {
t.Fatal(err)
......
......@@ -16,6 +16,9 @@ import (
const activeWantsLimit = 16
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from
type Session struct {
ctx context.Context
tofetch []*cid.Cid
......@@ -40,8 +43,12 @@ type Session struct {
notif notifications.PubSub
uuid logging.Loggable
id uint64
}
// NewSession creates a new bitswap session whose lifetime is bounded by the
// given context
func (bs *Bitswap) NewSession(ctx context.Context) *Session {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
......@@ -54,6 +61,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: bs.getNextSessionID(),
}
cache, _ := lru.New(2048)
......@@ -73,11 +81,11 @@ type blkRecv struct {
blk blocks.Block
}
func (s *Session) ReceiveBlock(from peer.ID, blk blocks.Block) {
func (s *Session) receiveBlockFrom(from peer.ID, blk blocks.Block) {
s.incoming <- blkRecv{from: from, blk: blk}
}
func (s *Session) InterestedIn(c *cid.Cid) bool {
func (s *Session) interestedIn(c *cid.Cid) bool {
return s.interest.Contains(c.KeyString())
}
......@@ -134,14 +142,14 @@ func (s *Session) run(ctx context.Context) {
case <-s.tick.C:
var live []*cid.Cid
for c, _ := range s.liveWants {
for c := range s.liveWants {
cs, _ := cid.Cast([]byte(c))
live = append(live, cs)
s.liveWants[c] = time.Now()
}
// Broadcast these keys to everyone we're connected to
s.bs.wm.WantBlocks(ctx, live, nil)
s.bs.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 {
go func() {
......@@ -181,7 +189,7 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
for _, c := range ks {
s.liveWants[c.KeyString()] = time.Now()
}
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr)
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
}
func (s *Session) cancel(keys []*cid.Cid) {
......@@ -211,11 +219,15 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) {
}
}
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return getBlocksImpl(ctx, keys, s.notif, s.fetch, s.cancelWants)
}
// GetBlock fetches a single block
func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, s.GetBlocks)
}
......@@ -10,8 +10,8 @@ import (
)
type ThreadSafe struct {
lk sync.RWMutex
Wantlist Wantlist
lk sync.RWMutex
set map[string]*Entry
}
// not threadsafe
......@@ -23,7 +23,16 @@ type Entry struct {
Cid *cid.Cid
Priority int
RefCnt int
SesTrk map[uint64]struct{}
}
// NewRefEntry creates a new reference tracked wantlist entry
func NewRefEntry(c *cid.Cid, p int) *Entry {
return &Entry{
Cid: c,
Priority: p,
SesTrk: make(map[uint64]struct{}),
}
}
type entrySlice []*Entry
......@@ -34,7 +43,7 @@ func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priorit
func NewThreadSafe() *ThreadSafe {
return &ThreadSafe{
Wantlist: *New(),
set: make(map[string]*Entry),
}
}
......@@ -44,46 +53,86 @@ func New() *Wantlist {
}
}
func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool {
func (w *ThreadSafe) Add(c *cid.Cid, priority int, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Add(k, priority)
k := c.KeyString()
if e, ok := w.set[k]; ok {
e.SesTrk[ses] = struct{}{}
return false
}
w.set[k] = &Entry{
Cid: c,
Priority: priority,
SesTrk: map[uint64]struct{}{ses: struct{}{}},
}
return true
}
func (w *ThreadSafe) AddEntry(e *Entry) bool {
func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.AddEntry(e)
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
ex.SesTrk[ses] = struct{}{}
return false
}
w.set[k] = e
e.SesTrk[ses] = struct{}{}
return true
}
func (w *ThreadSafe) Remove(k *cid.Cid) bool {
func (w *ThreadSafe) Remove(c *cid.Cid, ses uint64) bool {
w.lk.Lock()
defer w.lk.Unlock()
return w.Wantlist.Remove(k)
k := c.KeyString()
e, ok := w.set[k]
if !ok {
return false
}
delete(e.SesTrk, ses)
if len(e.SesTrk) == 0 {
delete(w.set, k)
return true
}
return false
}
func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Contains(k)
e, ok := w.set[k.KeyString()]
return e, ok
}
func (w *ThreadSafe) Entries() []*Entry {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Entries()
var es entrySlice
for _, e := range w.set {
es = append(es, e)
}
return es
}
func (w *ThreadSafe) SortedEntries() []*Entry {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.SortedEntries()
var es entrySlice
for _, e := range w.set {
es = append(es, e)
}
sort.Sort(es)
return es
}
func (w *ThreadSafe) Len() int {
w.lk.RLock()
defer w.lk.RUnlock()
return w.Wantlist.Len()
return len(w.set)
}
func (w *Wantlist) Len() int {
......@@ -92,15 +141,13 @@ func (w *Wantlist) Len() int {
func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
k := c.KeyString()
if e, ok := w.set[k]; ok {
e.RefCnt++
if _, ok := w.set[k]; ok {
return false
}
w.set[k] = &Entry{
Cid: c,
Priority: priority,
RefCnt: 1,
}
return true
......@@ -108,8 +155,7 @@ func (w *Wantlist) Add(c *cid.Cid, priority int) bool {
func (w *Wantlist) AddEntry(e *Entry) bool {
k := e.Cid.KeyString()
if ex, ok := w.set[k]; ok {
ex.RefCnt++
if _, ok := w.set[k]; ok {
return false
}
w.set[k] = e
......@@ -118,16 +164,12 @@ func (w *Wantlist) AddEntry(e *Entry) bool {
func (w *Wantlist) Remove(c *cid.Cid) bool {
k := c.KeyString()
e, ok := w.set[k]
_, ok := w.set[k]
if !ok {
return false
}
e.RefCnt--
if e.RefCnt <= 0 {
delete(w.set, k)
return true
}
delete(w.set, k)
return false
}
......
package wantlist
import (
"testing"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
)
var testcids []*cid.Cid
func init() {
strs := []string{
"QmQL8LqkEgYXaDHdNYCG2mmpow7Sp8Z8Kt3QS688vyBeC7",
"QmcBDsdjgSXU7BP4A4V8LJCXENE5xVwnhrhRGVTJr9YCVj",
"QmQakgd2wDxc3uUF4orGdEm28zUT9Mmimp5pyPG2SFS9Gj",
}
for _, s := range strs {
c, err := cid.Decode(s)
if err != nil {
panic(err)
}
testcids = append(testcids, c)
}
}
type wli interface {
Contains(*cid.Cid) (*Entry, bool)
}
func assertHasCid(t *testing.T, w wli, c *cid.Cid) {
e, ok := w.Contains(c)
if !ok {
t.Fatal("expected to have ", c)
}
if !e.Cid.Equals(c) {
t.Fatal("returned entry had wrong cid value")
}
}
func assertNotHasCid(t *testing.T, w wli, c *cid.Cid) {
_, ok := w.Contains(c)
if ok {
t.Fatal("expected not to have ", c)
}
}
func TestBasicWantlist(t *testing.T) {
wl := New()
wl.Add(testcids[0], 5)
assertHasCid(t, wl, testcids[0])
wl.Add(testcids[1], 4)
assertHasCid(t, wl, testcids[0])
assertHasCid(t, wl, testcids[1])
if wl.Len() != 2 {
t.Fatal("should have had two items")
}
wl.Add(testcids[1], 4)
assertHasCid(t, wl, testcids[0])
assertHasCid(t, wl, testcids[1])
if wl.Len() != 2 {
t.Fatal("should have had two items")
}
wl.Remove(testcids[0])
assertHasCid(t, wl, testcids[1])
if _, has := wl.Contains(testcids[0]); has {
t.Fatal("shouldnt have this cid")
}
}
func TestSesRefWantlist(t *testing.T) {
wl := NewThreadSafe()
wl.Add(testcids[0], 5, 1)
assertHasCid(t, wl, testcids[0])
wl.Remove(testcids[0], 2)
assertHasCid(t, wl, testcids[0])
wl.Add(testcids[0], 5, 1)
assertHasCid(t, wl, testcids[0])
wl.Remove(testcids[0], 1)
assertNotHasCid(t, wl, testcids[0])
}
......@@ -71,34 +71,31 @@ type msgQueue struct {
done chan struct{}
}
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID) {
func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
log.Infof("want blocks: %s", ks)
pm.addEntries(ctx, ks, peers, false)
pm.addEntries(ctx, ks, peers, false, ses)
}
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID) {
pm.addEntries(context.Background(), ks, peers, true)
func (pm *WantManager) CancelWants(ctx context.Context, ks []*cid.Cid, peers []peer.ID, ses uint64) {
pm.addEntries(context.Background(), ks, peers, true, ses)
}
type wantSet struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool) {
func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, targets []peer.ID, cancel bool, ses uint64) {
var entries []*bsmsg.Entry
for i, k := range ks {
entries = append(entries, &bsmsg.Entry{
Cancel: cancel,
Entry: &wantlist.Entry{
Cid: k,
Priority: kMaxPriority - i,
RefCnt: 1,
},
Entry: wantlist.NewRefEntry(k, kMaxPriority-i),
})
}
select {
case pm.incoming <- &wantSet{entries: entries, targets: targets}:
case pm.incoming <- &wantSet{entries: entries, targets: targets, from: ses}:
case <-pm.ctx.Done():
case <-ctx.Done():
}
......@@ -290,11 +287,11 @@ func (pm *WantManager) Run() {
// add changes to our wantlist
for _, e := range ws.entries {
if e.Cancel {
if pm.wl.Remove(e.Cid) {
if pm.wl.Remove(e.Cid, ws.from) {
pm.wantlistGauge.Dec()
}
} else {
if pm.wl.AddEntry(e.Entry) {
if pm.wl.AddEntry(e.Entry, ws.from) {
pm.wantlistGauge.Inc()
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment