Unverified Commit 7944a99c authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #174 from ipfs/fix/dont-ignore-pending-blocks

Fix: don't ignore received blocks for pending wants
parents cbb8d35c 312b40ba
......@@ -273,14 +273,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom("", []blocks.Block{blk})
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
}
// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
......@@ -294,7 +294,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.wm.IsWanted(b.Cid()) {
if bs.sm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
......@@ -354,6 +354,12 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
}
}
if from != "" {
for _, b := range wanted {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}
return nil
}
......@@ -382,17 +388,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
// Process blocks
err := bs.receiveBlocksFrom(p, iblocks)
err := bs.receiveBlocksFrom(ctx, p, iblocks)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}
for _, b := range iblocks {
if bs.wm.IsWanted(b.Cid()) {
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
}
}
}
func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
......
......@@ -21,6 +21,7 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
peer "github.com/libp2p/go-libp2p-core/peer"
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
travis "github.com/libp2p/go-libp2p-testing/ci/travis"
tu "github.com/libp2p/go-libp2p-testing/etc"
......@@ -138,6 +139,8 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
}
}
// Tests that a received block is not stored in the blockstore if the block was
// not requested by the client
func TestUnwantedBlockNotAdded(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......@@ -170,6 +173,68 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
}
}
// Tests that a received block is returned to the client and stored in the
// blockstore in the following scenario:
// - the want for the block has been requested by the client
// - the want for the block has not yet been sent out to a peer
// (because the live request queue is full)
func TestPendingBlockAdded(t *testing.T) {
ctx := context.Background()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
bg := blocksutil.NewBlockGenerator()
sessionBroadcastWantCapacity := 4
ig := testinstance.NewTestInstanceGenerator(net)
defer ig.Close()
instance := ig.Instances(1)[0]
defer instance.Exchange.Close()
oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Request enough blocks to exceed the session's broadcast want list
// capacity (by one block). The session will put the remaining block
// into the "tofetch" queue
blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
ks := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
ks = append(ks, b.Cid())
}
outch, err := instance.Exchange.GetBlocks(ctx, ks)
if err != nil {
t.Fatal(err)
}
// Wait a little while to make sure the session has time to process the wants
time.Sleep(time.Millisecond * 20)
// Simulate receiving a message which contains the block in the "tofetch" queue
lastBlock := blks[len(blks)-1]
bsMessage := message.New(true)
bsMessage.AddBlock(lastBlock)
unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)
// Make sure Bitswap adds the block to the output channel
blkrecvd, ok := <-outch
if !ok {
t.Fatal("timed out waiting for block")
}
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}
// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}
func TestLargeSwarm(t *testing.T) {
if testing.Short() {
t.SkipNow()
......
......@@ -2,10 +2,8 @@ package session
import (
"context"
"math/rand"
"time"
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
bssd "github.com/ipfs/go-bitswap/sessiondata"
......@@ -47,11 +45,6 @@ type RequestSplitter interface {
RecordUniqueBlock()
}
type interestReq struct {
c cid.Cid
resp chan bool
}
type rcvFrom struct {
from peer.ID
ks []cid.Cid
......@@ -67,19 +60,16 @@ type Session struct {
pm PeerManager
srs RequestSplitter
sw sessionWants
// channels
incoming chan rcvFrom
newReqs chan []cid.Cid
cancelKeys chan []cid.Cid
interestReqs chan interestReq
latencyReqs chan chan time.Duration
tickDelayReqs chan time.Duration
// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
idleTick *time.Timer
periodicSearchTimer *time.Timer
baseTickDelay time.Duration
......@@ -105,12 +95,13 @@ func New(ctx context.Context,
initialSearchDelay time.Duration,
periodicSearchDelay delay.D) *Session {
s := &Session{
sw: sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
pastWants: cid.NewSet(),
},
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
......@@ -126,9 +117,6 @@ func New(ctx context.Context,
periodicSearchDelay: periodicSearchDelay,
}
cache, _ := lru.New(2048)
s.interest = cache
go s.run(ctx)
return s
......@@ -136,40 +124,20 @@ func New(ctx context.Context,
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
select {
case s.incoming <- rcvFrom{from: from, ks: ks}:
case <-s.ctx.Done():
interested := s.sw.FilterInteresting(ks)
if len(interested) == 0 {
return
}
}
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
if s.interest.Contains(c) {
return true
}
// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
resp := make(chan bool, 1)
select {
case s.interestReqs <- interestReq{
c: c,
resp: resp,
}:
case s.incoming <- rcvFrom{from: from, ks: interested}:
case <-s.ctx.Done():
return false
}
}
select {
case want := <-resp:
return want
case <-s.ctx.Done():
return false
}
// IsWanted returns true if this session is waiting to receive the given Cid.
func (s *Session) IsWanted(c cid.Cid) bool {
return s.sw.IsWanted(c)
}
// GetBlock fetches a single block.
......@@ -233,23 +201,15 @@ func (s *Session) run(ctx context.Context) {
for {
select {
case rcv := <-s.incoming:
s.cancelIncoming(ctx, rcv)
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv)
}
s.handleIncoming(ctx, rcv)
case keys := <-s.newReqs:
s.handleNewRequest(ctx, keys)
s.wantBlocks(ctx, keys)
case keys := <-s.cancelKeys:
s.handleCancel(keys)
s.sw.CancelPending(keys)
case <-s.idleTick.C:
s.handleIdleTick(ctx)
case <-s.periodicSearchTimer.C:
s.handlePeriodicSearch(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
resp <- s.averageLatency()
case baseTickDelay := <-s.tickDelayReqs:
......@@ -261,59 +221,8 @@ func (s *Session) run(ctx context.Context) {
}
}
func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) {
// We've received the blocks so we can cancel any outstanding wants for them
wanted := make([]cid.Cid, 0, len(rcv.ks))
for _, k := range rcv.ks {
if s.cidIsWanted(k) {
wanted = append(wanted, k)
}
}
s.pm.RecordCancels(wanted)
s.wm.CancelWants(s.ctx, wanted, nil, s.id)
}
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
s.idleTick.Stop()
// Process the received blocks
s.processIncoming(ctx, rcv.ks)
s.resetIdleTick()
}
func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
for _, k := range keys {
s.interest.Add(k, nil)
}
if toadd := s.wantBudget(); toadd > 0 {
if toadd > len(keys) {
toadd = len(keys)
}
now := keys[:toadd]
keys = keys[toadd:]
s.wantBlocks(ctx, now)
}
for _, k := range keys {
s.tofetch.Push(k)
}
}
func (s *Session) handleCancel(keys []cid.Cid) {
for _, c := range keys {
s.tofetch.Remove(c)
}
}
func (s *Session) handleIdleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
live = append(live, c)
s.liveWants[c] = now
}
live := s.sw.PrepareBroadcast()
// Broadcast these keys to everyone we're connected to
s.pm.RecordPeerRequests(nil, live)
......@@ -326,13 +235,13 @@ func (s *Session) handleIdleTick(ctx context.Context) {
}
s.resetIdleTick()
if len(s.liveWants) > 0 {
if s.sw.HasLiveWants() {
s.consecutiveTicks++
}
}
func (s *Session) handlePeriodicSearch(ctx context.Context) {
randomWant := s.randomLiveWant()
randomWant := s.sw.RandomLiveWant()
if !randomWant.Defined() {
return
}
......@@ -345,85 +254,40 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}
func (s *Session) randomLiveWant() cid.Cid {
if len(s.liveWants) == 0 {
return cid.Cid{}
}
i := rand.Intn(len(s.liveWants))
// picking a random live want
for k := range s.liveWants {
if i == 0 {
return k
}
i--
}
return cid.Cid{}
}
func (s *Session) handleShutdown() {
s.idleTick.Stop()
live := make([]cid.Cid, 0, len(s.liveWants))
for c := range s.liveWants {
live = append(live, c)
}
live := s.sw.LiveWants()
s.wm.CancelWants(s.ctx, live, nil, s.id)
}
func (s *Session) cidIsWanted(c cid.Cid) bool {
_, ok := s.liveWants[c]
if !ok {
ok = s.tofetch.Has(c)
func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv)
}
return ok
}
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) {
for _, c := range ks {
if s.cidIsWanted(c) {
// If the block CID was in the live wants queue, remove it
tval, ok := s.liveWants[c]
if ok {
s.latTotal += time.Since(tval)
delete(s.liveWants, c)
} else {
// Otherwise remove it from the tofetch queue, if it was there
s.tofetch.Remove(c)
// Update the want list
wanted, totalLatency := s.sw.BlocksReceived(rcv.ks)
if len(wanted) == 0 {
return
}
s.fetchcnt++
// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0
// We've received the blocks so we can cancel any outstanding wants for them
s.cancelIncoming(ctx, wanted)
// Keep track of CIDs we've successfully fetched
s.pastWants.Push(c)
}
}
s.idleTick.Stop()
// Transfer as many CIDs as possible from the tofetch queue into the
// live wants queue
toAdd := s.wantBudget()
if toAdd > s.tofetch.Len() {
toAdd = s.tofetch.Len()
}
if toAdd > 0 {
var keys []cid.Cid
for i := 0; i < toAdd; i++ {
keys = append(keys, s.tofetch.Pop())
}
s.wantBlocks(ctx, keys)
}
// Process the received blocks
s.processIncoming(ctx, wanted, totalLatency)
s.resetIdleTick()
}
func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
for _, k := range rcv.ks {
// Inform the request splitter of unique / duplicate blocks
if s.cidIsWanted(k) {
s.srs.RecordUniqueBlock()
} else if s.pastWants.Has(k) {
s.srs.RecordDuplicateBlock()
}
}
// Record unique vs duplicate blocks
s.sw.ForEachUniqDup(rcv.ks, s.srs.RecordUniqueBlock, s.srs.RecordDuplicateBlock)
// Record response (to be able to time latency)
if len(rcv.ks) > 0 {
......@@ -431,11 +295,31 @@ func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
}
}
func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
now := time.Now()
for _, c := range ks {
s.liveWants[c] = now
func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
s.pm.RecordCancels(ks)
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}
func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatency time.Duration) {
// Keep track of the total number of blocks received and total latency
s.fetchcnt += len(ks)
s.latTotal += totalLatency
// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0
s.wantBlocks(ctx, nil)
}
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
// Given the want limit and any newly received blocks, get as many wants as
// we can to send out
ks := s.sw.GetNextWants(s.wantLimit(), newks)
if len(ks) == 0 {
return
}
peers := s.pm.GetOptimizedPeers()
if len(peers) > 0 {
splitRequests := s.srs.SplitRequest(peers, ks)
......@@ -465,16 +349,9 @@ func (s *Session) resetIdleTick() {
s.idleTick.Reset(tickDelay)
}
func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
func (s *Session) wantLimit() int {
if len(s.pm.GetOptimizedPeers()) > 0 {
budget = targetedLiveWantsLimit - live
} else {
budget = broadcastLiveWantsLimit - live
}
if budget < 0 {
budget = 0
return targetedLiveWantsLimit
}
return budget
return broadcastLiveWantsLimit
}
......@@ -118,6 +118,11 @@ func TestSessionGetBlocks(t *testing.T) {
if receivedWantReq.peers != nil {
t.Fatal("first want request should be a broadcast")
}
for _, c := range cids {
if !session.IsWanted(c) {
t.Fatal("expected session to want cids")
}
}
// now receive the first set of blocks
peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
......@@ -211,6 +216,11 @@ func TestSessionGetBlocks(t *testing.T) {
t.Fatal("received incorrect block")
}
}
for _, c := range cids {
if session.IsWanted(c) {
t.Fatal("expected session NOT to want cids")
}
}
}
func TestSessionFindMorePeers(t *testing.T) {
......
package session
import (
"math/rand"
"sync"
"time"
cid "github.com/ipfs/go-cid"
)
type sessionWants struct {
sync.RWMutex
toFetch *cidQueue
liveWants map[cid.Cid]time.Time
pastWants *cid.Set
}
// BlocksReceived moves received block CIDs from live to past wants and
// measures latency. It returns the CIDs of blocks that were actually wanted
// (as opposed to duplicates) and the total latency for all incoming blocks.
func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) {
now := time.Now()
sw.Lock()
defer sw.Unlock()
totalLatency := time.Duration(0)
wanted := make([]cid.Cid, 0, len(cids))
for _, c := range cids {
if sw.unlockedIsWanted(c) {
wanted = append(wanted, c)
// If the block CID was in the live wants queue, remove it
tval, ok := sw.liveWants[c]
if ok {
totalLatency += now.Sub(tval)
delete(sw.liveWants, c)
} else {
// Otherwise remove it from the toFetch queue, if it was there
sw.toFetch.Remove(c)
}
// Keep track of CIDs we've successfully fetched
sw.pastWants.Add(c)
}
}
return wanted, totalLatency
}
// GetNextWants adds any new wants to the list of CIDs to fetch, then moves as
// many CIDs from the fetch queue to the live wants list as possible (given the
// limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int, newWants []cid.Cid) []cid.Cid {
now := time.Now()
sw.Lock()
defer sw.Unlock()
// Add new wants to the fetch queue
for _, k := range newWants {
sw.toFetch.Push(k)
}
// Move CIDs from fetch queue to the live wants queue (up to the limit)
currentLiveCount := len(sw.liveWants)
toAdd := limit - currentLiveCount
var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWants[c] = now
}
return live
}
// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
now := time.Now()
sw.Lock()
defer sw.Unlock()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
sw.liveWants[c] = now
}
return live
}
// CancelPending removes the given CIDs from the fetch queue.
func (sw *sessionWants) CancelPending(keys []cid.Cid) {
sw.Lock()
defer sw.Unlock()
for _, k := range keys {
sw.toFetch.Remove(k)
}
}
// ForEachUniqDup iterates over each of the given CIDs and calls isUniqFn
// if the session is expecting a block for the CID, or isDupFn if the session
// has already received the block.
func (sw *sessionWants) ForEachUniqDup(ks []cid.Cid, isUniqFn, isDupFn func()) {
sw.RLock()
for _, k := range ks {
if sw.unlockedIsWanted(k) {
isUniqFn()
} else if sw.pastWants.Has(k) {
isDupFn()
}
}
sw.RUnlock()
}
// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
sw.RLock()
defer sw.RUnlock()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
}
return live
}
// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
i := rand.Uint64()
sw.RLock()
defer sw.RUnlock()
if len(sw.liveWants) == 0 {
return cid.Cid{}
}
i %= uint64(len(sw.liveWants))
// picking a random live want
for k := range sw.liveWants {
if i == 0 {
return k
}
i--
}
return cid.Cid{}
}
// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
sw.RLock()
defer sw.RUnlock()
return len(sw.liveWants) > 0
}
// IsWanted indicates if the session is expecting to receive the block with the
// given CID
func (sw *sessionWants) IsWanted(c cid.Cid) bool {
sw.RLock()
defer sw.RUnlock()
return sw.unlockedIsWanted(c)
}
// FilterInteresting filters the list so that it only contains keys for
// blocks that the session is waiting to receive or has received in the past
func (sw *sessionWants) FilterInteresting(ks []cid.Cid) []cid.Cid {
sw.RLock()
defer sw.RUnlock()
var interested []cid.Cid
for _, k := range ks {
if sw.unlockedIsWanted(k) || sw.pastWants.Has(k) {
interested = append(interested, k)
}
}
return interested
}
func (sw *sessionWants) unlockedIsWanted(c cid.Cid) bool {
_, ok := sw.liveWants[c]
if !ok {
ok = sw.toFetch.Has(c)
}
return ok
}
package session
import (
"testing"
"time"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
)
func TestSessionWants(t *testing.T) {
sw := sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
pastWants: cid.NewSet(),
}
cids := testutil.GenerateCids(10)
others := testutil.GenerateCids(1)
// Expect these functions to return nothing on a new sessionWants
lws := sw.PrepareBroadcast()
if len(lws) > 0 {
t.Fatal("expected no broadcast wants")
}
lws = sw.LiveWants()
if len(lws) > 0 {
t.Fatal("expected no live wants")
}
if sw.HasLiveWants() {
t.Fatal("expected not to have live wants")
}
rw := sw.RandomLiveWant()
if rw.Defined() {
t.Fatal("expected no random want")
}
if sw.IsWanted(cids[0]) {
t.Fatal("expected cid to not be wanted")
}
if len(sw.FilterInteresting(cids)) > 0 {
t.Fatal("expected no interesting wants")
}
// Add 10 new wants with a limit of 5
// The first 5 cids should go into the toFetch queue
// The other 5 cids should go into the live want queue
// toFetch Live Past
// 98765 43210
nextw := sw.GetNextWants(5, cids)
if len(nextw) != 5 {
t.Fatal("expected 5 next wants")
}
lws = sw.PrepareBroadcast()
if len(lws) != 5 {
t.Fatal("expected 5 broadcast wants")
}
lws = sw.LiveWants()
if len(lws) != 5 {
t.Fatal("expected 5 live wants")
}
if !sw.HasLiveWants() {
t.Fatal("expected to have live wants")
}
rw = sw.RandomLiveWant()
if !rw.Defined() {
t.Fatal("expected random want")
}
if !sw.IsWanted(cids[0]) {
t.Fatal("expected cid to be wanted")
}
if !sw.IsWanted(cids[9]) {
t.Fatal("expected cid to be wanted")
}
if len(sw.FilterInteresting([]cid.Cid{cids[0], cids[9], others[0]})) != 2 {
t.Fatal("expected 2 interesting wants")
}
// Two wanted blocks and one other block are received.
// The wanted blocks should be moved from the live wants queue
// to the past wants set (the other block CID should be ignored)
// toFetch Live Past
// 98765 432__ 10
recvdCids := []cid.Cid{cids[0], cids[1], others[0]}
uniq := 0
dup := 0
sw.ForEachUniqDup(recvdCids, func() { uniq++ }, func() { dup++ })
if uniq != 2 || dup != 0 {
t.Fatal("expected 2 uniqs / 0 dups", uniq, dup)
}
sw.BlocksReceived(recvdCids)
lws = sw.LiveWants()
if len(lws) != 3 {
t.Fatal("expected 3 live wants")
}
if sw.IsWanted(cids[0]) {
t.Fatal("expected cid to no longer be wanted")
}
if !sw.IsWanted(cids[9]) {
t.Fatal("expected cid to be wanted")
}
if len(sw.FilterInteresting([]cid.Cid{cids[0], cids[9], others[0]})) != 2 {
t.Fatal("expected 2 interesting wants")
}
// Ask for next wants with a limit of 5
// Should move 2 wants from toFetch queue to live wants
// toFetch Live Past
// 987__ 65432 10
nextw = sw.GetNextWants(5, nil)
if len(nextw) != 2 {
t.Fatal("expected 2 next wants")
}
lws = sw.LiveWants()
if len(lws) != 5 {
t.Fatal("expected 5 live wants")
}
if !sw.IsWanted(cids[5]) {
t.Fatal("expected cid to be wanted")
}
// One wanted block and one dup block are received.
// The wanted block should be moved from the live wants queue
// to the past wants set
// toFetch Live Past
// 987 654_2 310
recvdCids = []cid.Cid{cids[0], cids[3]}
uniq = 0
dup = 0
sw.ForEachUniqDup(recvdCids, func() { uniq++ }, func() { dup++ })
if uniq != 1 || dup != 1 {
t.Fatal("expected 1 uniq / 1 dup", uniq, dup)
}
sw.BlocksReceived(recvdCids)
lws = sw.LiveWants()
if len(lws) != 4 {
t.Fatal("expected 4 live wants")
}
// One block in the toFetch queue should be cancelled
// toFetch Live Past
// 9_7 654_2 310
sw.CancelPending([]cid.Cid{cids[8]})
lws = sw.LiveWants()
if len(lws) != 4 {
t.Fatal("expected 4 live wants")
}
if sw.IsWanted(cids[8]) {
t.Fatal("expected cid to no longer be wanted")
}
if len(sw.FilterInteresting([]cid.Cid{cids[0], cids[8]})) != 1 {
t.Fatal("expected 1 interesting wants")
}
}
......@@ -17,8 +17,8 @@ import (
// Session is a session that is managed by the session manager
type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveFrom(peer.ID, []cid.Cid)
IsWanted(cid.Cid) bool
}
type sesTrk struct {
......@@ -114,20 +114,26 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
return sm.sessID
}
// ReceiveFrom receives blocks from a peer and dispatches to interested
// sessions.
// ReceiveFrom receives block CIDs from a peer and dispatches to sessions.
func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
// Only give each session the blocks / dups that it is interested in
for _, s := range sm.sessions {
sessKs := make([]cid.Cid, 0, len(ks))
for _, k := range ks {
if s.session.InterestedIn(k) {
sessKs = append(sessKs, k)
s.session.ReceiveFrom(from, ks)
}
}
// IsWanted indicates whether any of the sessions are waiting to receive
// the block with the given CID.
func (sm *SessionManager) IsWanted(cid cid.Cid) bool {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
if s.session.IsWanted(cid) {
return true
}
s.session.ReceiveFrom(from, sessKs)
}
return false
}
......@@ -18,7 +18,7 @@ import (
)
type fakeSession struct {
interested []cid.Cid
wanted []cid.Cid
ks []cid.Cid
id uint64
pm *fakePeerManager
......@@ -32,8 +32,8 @@ func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
return nil, nil
}
func (fs *fakeSession) InterestedIn(c cid.Cid) bool {
for _, ic := range fs.interested {
func (fs *fakeSession) IsWanted(c cid.Cid) bool {
for _, ic := range fs.wanted {
if c == ic {
return true
}
......@@ -63,7 +63,7 @@ func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
var nextInterestedIn []cid.Cid
var nextWanted []cid.Cid
func sessionFactory(ctx context.Context,
id uint64,
......@@ -73,7 +73,7 @@ func sessionFactory(ctx context.Context,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) Session {
return &fakeSession{
interested: nextInterestedIn,
wanted: nextWanted,
id: id,
pm: pm.(*fakePeerManager),
srs: srs.(*fakeRequestSplitter),
......@@ -89,24 +89,6 @@ func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
return &fakeRequestSplitter{}
}
func cmpSessionCids(s *fakeSession, cids []cid.Cid) bool {
if len(s.ks) != len(cids) {
return false
}
for _, bk := range s.ks {
has := false
for _, c := range cids {
if c == bk {
has = true
}
}
if !has {
return false
}
}
return true
}
func TestAddingSessions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
......@@ -118,7 +100,7 @@ func TestAddingSessions(t *testing.T) {
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = []cid.Cid{block.Cid()}
nextWanted = []cid.Cid{block.Cid()}
currentID := sm.GetNextSessionID()
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
......@@ -145,7 +127,7 @@ func TestAddingSessions(t *testing.T) {
}
}
func TestReceivingBlocksWhenNotInterested(t *testing.T) {
func TestIsWanted(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
......@@ -153,26 +135,24 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
defer notif.Shutdown()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)
p := peer.ID(123)
blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(4, 1024)
var cids []cid.Cid
for _, b := range blks {
cids = append(cids, b.Cid())
}
nextInterestedIn = []cid.Cid{cids[0], cids[1]}
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextInterestedIn = []cid.Cid{cids[0]}
secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextInterestedIn = []cid.Cid{}
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sm.ReceiveFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()})
nextWanted = []cid.Cid{cids[0], cids[1]}
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextWanted = []cid.Cid{cids[0], cids[2]}
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) ||
!cmpSessionCids(secondSession, []cid.Cid{cids[0]}) ||
!cmpSessionCids(thirdSession, []cid.Cid{}) {
t.Fatal("did not receive correct blocks for sessions")
if !sm.IsWanted(cids[0]) ||
!sm.IsWanted(cids[1]) ||
!sm.IsWanted(cids[2]) {
t.Fatal("expected unwanted but session manager did want cid")
}
if sm.IsWanted(cids[3]) {
t.Fatal("expected wanted but session manager did not want cid")
}
}
......@@ -186,7 +166,7 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = []cid.Cid{block.Cid()}
nextWanted = []cid.Cid{block.Cid()}
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
......@@ -213,7 +193,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = []cid.Cid{block.Cid()}
nextWanted = []cid.Cid{block.Cid()}
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sessionCtx, sessionCancel := context.WithCancel(ctx)
secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
......
......@@ -80,22 +80,6 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe
wm.addEntries(context.Background(), ks, peers, true, ses)
}
// IsWanted returns whether a CID is currently wanted.
func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan bool, 1)
select {
case wm.wantMessages <- &isWantedMessage{c, resp}:
case <-wm.ctx.Done():
return false
}
select {
case wanted := <-resp:
return wanted
case <-wm.ctx.Done():
return false
}
}
// CurrentWants returns the list of current wants.
func (wm *WantManager) CurrentWants() []wantlist.Entry {
resp := make(chan []wantlist.Entry, 1)
......@@ -232,16 +216,6 @@ func (ws *wantSet) handle(wm *WantManager) {
wm.peerHandler.SendMessage(ws.entries, ws.targets, ws.from)
}
type isWantedMessage struct {
c cid.Cid
resp chan<- bool
}
func (iwm *isWantedMessage) handle(wm *WantManager) {
_, isWanted := wm.wl.Contains(iwm.c)
iwm.resp <- isWanted
}
type currentWantsMessage struct {
resp chan<- []wantlist.Entry
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment