Commit 38c6f533 authored by Dirk McCormick's avatar Dirk McCormick Committed by Steven Allen

refactor: pass around keys instead of blocks

parent 0bd2ede0
......@@ -315,12 +315,25 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
// Send all blocks (including duplicates) to any sessions that want them.
allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
}
wantedKs := allKs
if len(blks) != len(wanted) {
wantedKs = make([]cid.Cid, 0, len(wanted))
for _, b := range wanted {
wantedKs = append(wantedKs, b.Cid())
}
}
// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveBlocksFrom(from, blks)
bs.sm.ReceiveBlocksFrom(from, allKs)
// Send wanted blocks to decision engine
bs.engine.AddBlocks(wanted)
// Send wanted block keys to decision engine
bs.engine.AddBlocks(wantedKs)
// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of received
......@@ -331,9 +344,9 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, b := range wanted {
for _, k := range wantedKs {
select {
case bs.newBlocks <- b.Cid():
case bs.newBlocks <- k:
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
......
......@@ -10,7 +10,6 @@ import (
"github.com/google/uuid"
bsmsg "github.com/ipfs/go-bitswap/message"
wl "github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
......@@ -312,13 +311,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
}
}
func (e *Engine) addBlocks(blocks []blocks.Block) {
func (e *Engine) addBlocks(ks []cid.Cid) {
work := false
for _, l := range e.ledgerMap {
l.lk.Lock()
for _, block := range blocks {
if entry, ok := l.WantListContains(block.Cid()); ok {
for _, k := range ks {
if entry, ok := l.WantListContains(k); ok {
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
Identifier: entry.Cid,
Priority: entry.Priority,
......@@ -337,11 +336,11 @@ func (e *Engine) addBlocks(blocks []blocks.Block) {
// AddBlocks is called when new blocks are received and added to a block store,
// meaning there may be peers who want those blocks, so we should send the blocks
// to them.
func (e *Engine) AddBlocks(blocks []blocks.Block) {
func (e *Engine) AddBlocks(ks []cid.Cid) {
e.lock.Lock()
defer e.lock.Unlock()
e.addBlocks(blocks)
e.addBlocks(ks)
}
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
......
......@@ -54,7 +54,7 @@ type interestReq struct {
type blksRecv struct {
from peer.ID
blks []blocks.Block
ks []cid.Cid
}
// Session holds state for an individual bitswap transfer operation.
......@@ -135,9 +135,9 @@ func New(ctx context.Context,
}
// ReceiveBlocksFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveBlocksFrom(from peer.ID, blocks []blocks.Block) {
func (s *Session) ReceiveBlocksFrom(from peer.ID, ks []cid.Cid) {
select {
case s.incoming <- blksRecv{from: from, blks: blocks}:
case s.incoming <- blksRecv{from: from, ks: ks}:
case <-s.ctx.Done():
}
}
......@@ -262,21 +262,21 @@ func (s *Session) run(ctx context.Context) {
func (s *Session) cancelIncomingBlocks(ctx context.Context, rcv blksRecv) {
// We've received the blocks so we can cancel any outstanding wants for them
ks := make([]cid.Cid, 0, len(rcv.blks))
for _, b := range rcv.blks {
if s.cidIsWanted(b.Cid()) {
ks = append(ks, b.Cid())
wanted := make([]cid.Cid, 0, len(rcv.ks))
for _, k := range rcv.ks {
if s.cidIsWanted(k) {
wanted = append(wanted, k)
}
}
s.pm.RecordCancels(ks)
s.wm.CancelWants(s.ctx, ks, nil, s.id)
s.pm.RecordCancels(wanted)
s.wm.CancelWants(s.ctx, wanted, nil, s.id)
}
func (s *Session) handleIncomingBlocks(ctx context.Context, rcv blksRecv) {
s.idleTick.Stop()
// Process the received blocks
s.receiveBlocks(ctx, rcv.blks)
s.receiveBlocks(ctx, rcv.ks)
s.resetIdleTick()
}
......@@ -376,9 +376,8 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
return ok
}
func (s *Session) receiveBlocks(ctx context.Context, blocks []blocks.Block) {
for _, blk := range blocks {
c := blk.Cid()
func (s *Session) receiveBlocks(ctx context.Context, ks []cid.Cid) {
for _, c := range ks {
if s.cidIsWanted(c) {
// If the block CID was in the live wants queue, remove it
tval, ok := s.liveWants[c]
......@@ -416,22 +415,18 @@ func (s *Session) receiveBlocks(ctx context.Context, blocks []blocks.Block) {
}
func (s *Session) updateReceiveCounters(ctx context.Context, rcv blksRecv) {
ks := make([]cid.Cid, len(rcv.blks))
for _, blk := range rcv.blks {
for _, k := range rcv.ks {
// Inform the request splitter of unique / duplicate blocks
if s.cidIsWanted(blk.Cid()) {
if s.cidIsWanted(k) {
s.srs.RecordUniqueBlock()
} else if s.pastWants.Has(blk.Cid()) {
} else if s.pastWants.Has(k) {
s.srs.RecordDuplicateBlock()
}
ks = append(ks, blk.Cid())
}
// Record response (to be able to time latency)
if len(ks) > 0 {
s.pm.RecordPeerResponse(rcv.from, ks)
if len(rcv.ks) > 0 {
s.pm.RecordPeerResponse(rcv.from, rcv.ks)
}
}
......
......@@ -129,7 +129,7 @@ func TestSessionGetBlocks(t *testing.T) {
// - calls ReceiveBlocksFrom() on session
// - publishes block to pubsub channel
blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]
session.ReceiveBlocksFrom(p, []blocks.Block{blk})
session.ReceiveBlocksFrom(p, []cid.Cid{blk.Cid()})
notif.Publish(blk)
select {
......@@ -191,7 +191,7 @@ func TestSessionGetBlocks(t *testing.T) {
// - calls ReceiveBlocksFrom() on session
// - publishes block to pubsub channel
blk := blks[testutil.IndexOf(blks, newCidsRequested[i])]
session.ReceiveBlocksFrom(p, []blocks.Block{blk})
session.ReceiveBlocksFrom(p, []cid.Cid{blk.Cid()})
notif.Publish(blk)
receivedBlock := <-getBlocksCh
......@@ -255,7 +255,7 @@ func TestSessionFindMorePeers(t *testing.T) {
// - calls ReceiveBlocksFrom() on session
// - publishes block to pubsub channel
blk := blks[0]
session.ReceiveBlocksFrom(p, []blocks.Block{blk})
session.ReceiveBlocksFrom(p, []cid.Cid{blk.Cid()})
notif.Publish(blk)
select {
case <-cancelReqs:
......
......@@ -5,7 +5,6 @@ import (
"sync"
"time"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
......@@ -19,7 +18,7 @@ import (
type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveBlocksFrom(peer.ID, []blocks.Block)
ReceiveBlocksFrom(peer.ID, []cid.Cid)
}
type sesTrk struct {
......@@ -117,18 +116,18 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
// ReceiveBlocksFrom receives blocks from a peer and dispatches to interested
// sessions.
func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, blks []blocks.Block) {
func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, ks []cid.Cid) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
// Only give each session the blocks / dups that it is interested in
for _, s := range sm.sessions {
sessBlks := make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if s.session.InterestedIn(b.Cid()) {
sessBlks = append(sessBlks, b)
sessKs := make([]cid.Cid, 0, len(ks))
for _, k := range ks {
if s.session.InterestedIn(k) {
sessKs = append(sessKs, k)
}
}
s.session.ReceiveBlocksFrom(from, sessBlks)
s.session.ReceiveBlocksFrom(from, sessKs)
}
}
......@@ -19,7 +19,7 @@ import (
type fakeSession struct {
interested []cid.Cid
blks []blocks.Block
ks []cid.Cid
id uint64
pm *fakePeerManager
srs *fakeRequestSplitter
......@@ -40,8 +40,8 @@ func (fs *fakeSession) InterestedIn(c cid.Cid) bool {
}
return false
}
func (fs *fakeSession) ReceiveBlocksFrom(p peer.ID, blks []blocks.Block) {
fs.blks = append(fs.blks, blks...)
func (fs *fakeSession) ReceiveBlocksFrom(p peer.ID, ks []cid.Cid) {
fs.ks = append(fs.ks, ks...)
}
type fakePeerManager struct {
......@@ -90,17 +90,13 @@ func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
}
func cmpSessionCids(s *fakeSession, cids []cid.Cid) bool {
return cmpBlockCids(s.blks, cids)
}
func cmpBlockCids(blks []blocks.Block, cids []cid.Cid) bool {
if len(blks) != len(cids) {
if len(s.ks) != len(cids) {
return false
}
for _, b := range blks {
for _, bk := range s.ks {
has := false
for _, c := range cids {
if c == b.Cid() {
if c == bk {
has = true
}
}
......@@ -141,10 +137,10 @@ func TestAddingSessions(t *testing.T) {
thirdSession.id != secondSession.id+2 {
t.Fatal("session does not have correct id set")
}
sm.ReceiveBlocksFrom(p, []blocks.Block{block})
if len(firstSession.blks) == 0 ||
len(secondSession.blks) == 0 ||
len(thirdSession.blks) == 0 {
sm.ReceiveBlocksFrom(p, []cid.Cid{block.Cid()})
if len(firstSession.ks) == 0 ||
len(secondSession.ks) == 0 ||
len(thirdSession.ks) == 0 {
t.Fatal("should have received blocks but didn't")
}
}
......@@ -171,7 +167,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
nextInterestedIn = []cid.Cid{}
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sm.ReceiveBlocksFrom(p, []blocks.Block{blks[0], blks[1]})
sm.ReceiveBlocksFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()})
if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) ||
!cmpSessionCids(secondSession, []cid.Cid{cids[0]}) ||
......@@ -198,10 +194,10 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
cancel()
// wait for sessions to get removed
time.Sleep(10 * time.Millisecond)
sm.ReceiveBlocksFrom(p, []blocks.Block{block})
if len(firstSession.blks) > 0 ||
len(secondSession.blks) > 0 ||
len(thirdSession.blks) > 0 {
sm.ReceiveBlocksFrom(p, []cid.Cid{block.Cid()})
if len(firstSession.ks) > 0 ||
len(secondSession.ks) > 0 ||
len(thirdSession.ks) > 0 {
t.Fatal("received blocks for sessions after manager is shutdown")
}
}
......@@ -226,10 +222,10 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
sessionCancel()
// wait for sessions to get removed
time.Sleep(10 * time.Millisecond)
sm.ReceiveBlocksFrom(p, []blocks.Block{block})
if len(firstSession.blks) == 0 ||
len(secondSession.blks) > 0 ||
len(thirdSession.blks) == 0 {
sm.ReceiveBlocksFrom(p, []cid.Cid{block.Cid()})
if len(firstSession.ks) == 0 ||
len(secondSession.ks) > 0 ||
len(thirdSession.ks) == 0 {
t.Fatal("received blocks for sessions that are canceled")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment