Commit ee8af715 authored by Jeromy's avatar Jeromy

fix issue with sessions not receiving locally added blocks

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent c8f38296
...@@ -317,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { ...@@ -317,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
// it now as it requires more thought and isnt causing immediate problems. // it now as it requires more thought and isnt causing immediate problems.
bs.notifications.Publish(blk) bs.notifications.Publish(blk)
for _, s := range bs.SessionsForBlock(blk.Cid()) {
s.receiveBlockFrom("", blk)
}
bs.engine.AddBlock(blk) bs.engine.AddBlock(blk)
select { select {
...@@ -370,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -370,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, block := range iblocks { for _, block := range iblocks {
wg.Add(1) wg.Add(1)
go func(b blocks.Block) { go func(b blocks.Block) { // TODO: this probably doesnt need to be a goroutine...
defer wg.Done() defer wg.Done()
bs.updateReceiveCounters(b) bs.updateReceiveCounters(b)
...@@ -382,7 +386,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -382,7 +386,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
ses.receiveBlockFrom(p, b) ses.receiveBlockFrom(p, b)
bs.CancelWants([]*cid.Cid{k}, ses.id) bs.CancelWants([]*cid.Cid{k}, ses.id)
} }
log.Debugf("got block %s from %s", b, p) log.Debugf("got block %s from %s", b, p)
// TODO: rework this to not call 'HasBlock'. 'HasBlock' is really
// designed to be called when blocks are coming in from non-bitswap
// places (like the user manually adding data)
if err := bs.HasBlock(b); err != nil { if err := bs.HasBlock(b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err) log.Warningf("ReceiveMessage HasBlock error: %s", err)
} }
......
...@@ -21,7 +21,7 @@ const activeWantsLimit = 16 ...@@ -21,7 +21,7 @@ const activeWantsLimit = 16
// info to, and who to request blocks from // info to, and who to request blocks from
type Session struct { type Session struct {
ctx context.Context ctx context.Context
tofetch []*cid.Cid tofetch *cidQueue
activePeers map[peer.ID]struct{} activePeers map[peer.ID]struct{}
activePeersArr []peer.ID activePeersArr []peer.ID
...@@ -55,6 +55,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session { ...@@ -55,6 +55,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
liveWants: make(map[string]time.Time), liveWants: make(map[string]time.Time),
newReqs: make(chan []*cid.Cid), newReqs: make(chan []*cid.Cid),
cancelKeys: make(chan []*cid.Cid), cancelKeys: make(chan []*cid.Cid),
tofetch: newCidQueue(),
interestReqs: make(chan interestReq), interestReqs: make(chan interestReq),
ctx: ctx, ctx: ctx,
bs: bs, bs: bs,
...@@ -157,7 +158,9 @@ func (s *Session) run(ctx context.Context) { ...@@ -157,7 +158,9 @@ func (s *Session) run(ctx context.Context) {
s.wantBlocks(ctx, now) s.wantBlocks(ctx, now)
} }
s.tofetch = append(s.tofetch, keys...) for _, k := range keys {
s.tofetch.Push(k)
}
case keys := <-s.cancelKeys: case keys := <-s.cancelKeys:
s.cancel(keys) s.cancel(keys)
...@@ -188,8 +191,7 @@ func (s *Session) run(ctx context.Context) { ...@@ -188,8 +191,7 @@ func (s *Session) run(ctx context.Context) {
case p := <-newpeers: case p := <-newpeers:
s.addActivePeer(p) s.addActivePeer(p)
case lwchk := <-s.interestReqs: case lwchk := <-s.interestReqs:
_, ok := s.liveWants[lwchk.c.KeyString()] lwchk.resp <- s.cidIsWanted(lwchk.c)
lwchk.resp <- ok
case <-ctx.Done(): case <-ctx.Done():
s.tick.Stop() s.tick.Stop()
return return
...@@ -197,19 +199,31 @@ func (s *Session) run(ctx context.Context) { ...@@ -197,19 +199,31 @@ func (s *Session) run(ctx context.Context) {
} }
} }
func (s *Session) cidIsWanted(c *cid.Cid) bool {
_, ok := s.liveWants[c.KeyString()]
if !ok {
ok = s.tofetch.Has(c)
}
return ok
}
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
ks := blk.Cid().KeyString() c := blk.Cid()
if _, ok := s.liveWants[ks]; ok { if s.cidIsWanted(c) {
tval := s.liveWants[ks] ks := c.KeyString()
s.latTotal += time.Since(tval) tval, ok := s.liveWants[ks]
if ok {
s.latTotal += time.Since(tval)
delete(s.liveWants, ks)
} else {
s.tofetch.Remove(c)
}
s.fetchcnt++ s.fetchcnt++
delete(s.liveWants, ks)
s.notif.Publish(blk) s.notif.Publish(blk)
if len(s.tofetch) > 0 { if next := s.tofetch.Pop(); next != nil {
next := s.tofetch[0:1] s.wantBlocks(ctx, []*cid.Cid{next})
s.tofetch = s.tofetch[1:]
s.wantBlocks(ctx, next)
} }
} }
} }
...@@ -222,19 +236,9 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) { ...@@ -222,19 +236,9 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
} }
func (s *Session) cancel(keys []*cid.Cid) { func (s *Session) cancel(keys []*cid.Cid) {
sset := cid.NewSet()
for _, c := range keys { for _, c := range keys {
sset.Add(c) s.tofetch.Remove(c)
} }
var i, j int
for ; j < len(s.tofetch); j++ {
if sset.Has(s.tofetch[j]) {
continue
}
s.tofetch[i] = s.tofetch[j]
i++
}
s.tofetch = s.tofetch[:i]
} }
func (s *Session) cancelWants(keys []*cid.Cid) { func (s *Session) cancelWants(keys []*cid.Cid) {
...@@ -260,3 +264,46 @@ func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks ...@@ -260,3 +264,46 @@ func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks
func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { func (s *Session) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) {
return getBlock(parent, k, s.GetBlocks) return getBlock(parent, k, s.GetBlocks)
} }
type cidQueue struct {
elems []*cid.Cid
eset *cid.Set
}
func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}
func (cq *cidQueue) Pop() *cid.Cid {
for {
if len(cq.elems) == 0 {
return nil
}
out := cq.elems[0]
cq.elems = cq.elems[1:]
if cq.eset.Has(out) {
cq.eset.Remove(out)
return out
}
}
}
func (cq *cidQueue) Push(c *cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
}
}
func (cq *cidQueue) Remove(c *cid.Cid) {
cq.eset.Remove(c)
}
func (cq *cidQueue) Has(c *cid.Cid) bool {
return cq.eset.Has(c)
}
func (cq *cidQueue) Len() int {
return cq.eset.Len()
}
...@@ -202,3 +202,43 @@ func TestInterestCacheOverflow(t *testing.T) { ...@@ -202,3 +202,43 @@ func TestInterestCacheOverflow(t *testing.T) {
t.Fatal("timed out waiting for block") t.Fatal("timed out waiting for block")
} }
} }
func TestPutAfterSessionCacheEvict(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
blks := bgen.Blocks(2500)
inst := sesgen.Instances(1)
a := inst[0]
ses := a.Exchange.NewSession(ctx)
var allcids []*cid.Cid
for _, blk := range blks[1:] {
allcids = append(allcids, blk.Cid())
}
blkch, err := ses.GetBlocks(ctx, allcids)
if err != nil {
t.Fatal(err)
}
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)
if err := a.Exchange.HasBlock(blks[17]); err != nil {
t.Fatal(err)
}
select {
case <-blkch:
case <-time.After(time.Millisecond * 50):
t.Fatal("timed out waiting for block")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment