Commit 693e97d0 authored by Dirk McCormick's avatar Dirk McCormick

refactor: change naming to reflect blocks -> keys

parent 38c6f533
...@@ -330,7 +330,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error { ...@@ -330,7 +330,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
// Send all block keys (including duplicates) to any sessions that want them. // Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes) // (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveBlocksFrom(from, allKs) bs.sm.ReceiveFrom(from, allKs)
// Send wanted block keys to decision engine // Send wanted block keys to decision engine
bs.engine.AddBlocks(wantedKs) bs.engine.AddBlocks(wantedKs)
......
...@@ -52,9 +52,9 @@ type interestReq struct { ...@@ -52,9 +52,9 @@ type interestReq struct {
resp chan bool resp chan bool
} }
type blksRecv struct { type rcvFrom struct {
from peer.ID from peer.ID
ks []cid.Cid ks []cid.Cid
} }
// Session holds state for an individual bitswap transfer operation. // Session holds state for an individual bitswap transfer operation.
...@@ -68,7 +68,7 @@ type Session struct { ...@@ -68,7 +68,7 @@ type Session struct {
srs RequestSplitter srs RequestSplitter
// channels // channels
incoming chan blksRecv incoming chan rcvFrom
newReqs chan []cid.Cid newReqs chan []cid.Cid
cancelKeys chan []cid.Cid cancelKeys chan []cid.Cid
interestReqs chan interestReq interestReqs chan interestReq
...@@ -117,7 +117,7 @@ func New(ctx context.Context, ...@@ -117,7 +117,7 @@ func New(ctx context.Context,
wm: wm, wm: wm,
pm: pm, pm: pm,
srs: srs, srs: srs,
incoming: make(chan blksRecv), incoming: make(chan rcvFrom),
notif: notif, notif: notif,
uuid: loggables.Uuid("GetBlockRequest"), uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500, baseTickDelay: time.Millisecond * 500,
...@@ -134,10 +134,10 @@ func New(ctx context.Context, ...@@ -134,10 +134,10 @@ func New(ctx context.Context,
return s return s
} }
// ReceiveBlocksFrom receives incoming blocks from the given peer. // ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveBlocksFrom(from peer.ID, ks []cid.Cid) { func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
select { select {
case s.incoming <- blksRecv{from: from, ks: ks}: case s.incoming <- rcvFrom{from: from, ks: ks}:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
} }
...@@ -232,13 +232,13 @@ func (s *Session) run(ctx context.Context) { ...@@ -232,13 +232,13 @@ func (s *Session) run(ctx context.Context) {
for { for {
select { select {
case rcv := <-s.incoming: case rcv := <-s.incoming:
s.cancelIncomingBlocks(ctx, rcv) s.cancelIncoming(ctx, rcv)
// Record statistics only if the blocks came from the network // Record statistics only if the blocks came from the network
// (blocks can also be received from the local node) // (blocks can also be received from the local node)
if rcv.from != "" { if rcv.from != "" {
s.updateReceiveCounters(ctx, rcv) s.updateReceiveCounters(ctx, rcv)
} }
s.handleIncomingBlocks(ctx, rcv) s.handleIncoming(ctx, rcv)
case keys := <-s.newReqs: case keys := <-s.newReqs:
s.handleNewRequest(ctx, keys) s.handleNewRequest(ctx, keys)
case keys := <-s.cancelKeys: case keys := <-s.cancelKeys:
...@@ -260,7 +260,7 @@ func (s *Session) run(ctx context.Context) { ...@@ -260,7 +260,7 @@ func (s *Session) run(ctx context.Context) {
} }
} }
func (s *Session) cancelIncomingBlocks(ctx context.Context, rcv blksRecv) { func (s *Session) cancelIncoming(ctx context.Context, rcv rcvFrom) {
// We've received the blocks so we can cancel any outstanding wants for them // We've received the blocks so we can cancel any outstanding wants for them
wanted := make([]cid.Cid, 0, len(rcv.ks)) wanted := make([]cid.Cid, 0, len(rcv.ks))
for _, k := range rcv.ks { for _, k := range rcv.ks {
...@@ -272,11 +272,11 @@ func (s *Session) cancelIncomingBlocks(ctx context.Context, rcv blksRecv) { ...@@ -272,11 +272,11 @@ func (s *Session) cancelIncomingBlocks(ctx context.Context, rcv blksRecv) {
s.wm.CancelWants(s.ctx, wanted, nil, s.id) s.wm.CancelWants(s.ctx, wanted, nil, s.id)
} }
func (s *Session) handleIncomingBlocks(ctx context.Context, rcv blksRecv) { func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
s.idleTick.Stop() s.idleTick.Stop()
// Process the received blocks // Process the received blocks
s.receiveBlocks(ctx, rcv.ks) s.processIncoming(ctx, rcv.ks)
s.resetIdleTick() s.resetIdleTick()
} }
...@@ -376,7 +376,7 @@ func (s *Session) cidIsWanted(c cid.Cid) bool { ...@@ -376,7 +376,7 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
return ok return ok
} }
func (s *Session) receiveBlocks(ctx context.Context, ks []cid.Cid) { func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid) {
for _, c := range ks { for _, c := range ks {
if s.cidIsWanted(c) { if s.cidIsWanted(c) {
// If the block CID was in the live wants queue, remove it // If the block CID was in the live wants queue, remove it
...@@ -414,7 +414,7 @@ func (s *Session) receiveBlocks(ctx context.Context, ks []cid.Cid) { ...@@ -414,7 +414,7 @@ func (s *Session) receiveBlocks(ctx context.Context, ks []cid.Cid) {
} }
} }
func (s *Session) updateReceiveCounters(ctx context.Context, rcv blksRecv) { func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
for _, k := range rcv.ks { for _, k := range rcv.ks {
// Inform the request splitter of unique / duplicate blocks // Inform the request splitter of unique / duplicate blocks
if s.cidIsWanted(k) { if s.cidIsWanted(k) {
......
...@@ -126,10 +126,10 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -126,10 +126,10 @@ func TestSessionGetBlocks(t *testing.T) {
var receivedBlocks []blocks.Block var receivedBlocks []blocks.Block
for i, p := range peers { for i, p := range peers {
// simulate what bitswap does on receiving a message: // simulate what bitswap does on receiving a message:
// - calls ReceiveBlocksFrom() on session // - calls ReceiveFrom() on session
// - publishes block to pubsub channel // - publishes block to pubsub channel
blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])] blk := blks[testutil.IndexOf(blks, receivedWantReq.cids[i])]
session.ReceiveBlocksFrom(p, []cid.Cid{blk.Cid()}) session.ReceiveFrom(p, []cid.Cid{blk.Cid()})
notif.Publish(blk) notif.Publish(blk)
select { select {
...@@ -188,10 +188,10 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -188,10 +188,10 @@ func TestSessionGetBlocks(t *testing.T) {
// receive remaining blocks // receive remaining blocks
for i, p := range peers { for i, p := range peers {
// simulate what bitswap does on receiving a message: // simulate what bitswap does on receiving a message:
// - calls ReceiveBlocksFrom() on session // - calls ReceiveFrom() on session
// - publishes block to pubsub channel // - publishes block to pubsub channel
blk := blks[testutil.IndexOf(blks, newCidsRequested[i])] blk := blks[testutil.IndexOf(blks, newCidsRequested[i])]
session.ReceiveBlocksFrom(p, []cid.Cid{blk.Cid()}) session.ReceiveFrom(p, []cid.Cid{blk.Cid()})
notif.Publish(blk) notif.Publish(blk)
receivedBlock := <-getBlocksCh receivedBlock := <-getBlocksCh
...@@ -252,10 +252,10 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -252,10 +252,10 @@ func TestSessionFindMorePeers(t *testing.T) {
p := testutil.GeneratePeers(1)[0] p := testutil.GeneratePeers(1)[0]
// simulate what bitswap does on receiving a message: // simulate what bitswap does on receiving a message:
// - calls ReceiveBlocksFrom() on session // - calls ReceiveFrom() on session
// - publishes block to pubsub channel // - publishes block to pubsub channel
blk := blks[0] blk := blks[0]
session.ReceiveBlocksFrom(p, []cid.Cid{blk.Cid()}) session.ReceiveFrom(p, []cid.Cid{blk.Cid()})
notif.Publish(blk) notif.Publish(blk)
select { select {
case <-cancelReqs: case <-cancelReqs:
......
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
type Session interface { type Session interface {
exchange.Fetcher exchange.Fetcher
InterestedIn(cid.Cid) bool InterestedIn(cid.Cid) bool
ReceiveBlocksFrom(peer.ID, []cid.Cid) ReceiveFrom(peer.ID, []cid.Cid)
} }
type sesTrk struct { type sesTrk struct {
...@@ -114,9 +114,9 @@ func (sm *SessionManager) GetNextSessionID() uint64 { ...@@ -114,9 +114,9 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
return sm.sessID return sm.sessID
} }
// ReceiveBlocksFrom receives blocks from a peer and dispatches to interested // ReceiveFrom receives blocks from a peer and dispatches to interested
// sessions. // sessions.
func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, ks []cid.Cid) { func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
sm.sessLk.Lock() sm.sessLk.Lock()
defer sm.sessLk.Unlock() defer sm.sessLk.Unlock()
...@@ -128,6 +128,6 @@ func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, ks []cid.Cid) { ...@@ -128,6 +128,6 @@ func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, ks []cid.Cid) {
sessKs = append(sessKs, k) sessKs = append(sessKs, k)
} }
} }
s.session.ReceiveBlocksFrom(from, sessKs) s.session.ReceiveFrom(from, sessKs)
} }
} }
...@@ -40,7 +40,7 @@ func (fs *fakeSession) InterestedIn(c cid.Cid) bool { ...@@ -40,7 +40,7 @@ func (fs *fakeSession) InterestedIn(c cid.Cid) bool {
} }
return false return false
} }
func (fs *fakeSession) ReceiveBlocksFrom(p peer.ID, ks []cid.Cid) { func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid) {
fs.ks = append(fs.ks, ks...) fs.ks = append(fs.ks, ks...)
} }
...@@ -137,7 +137,7 @@ func TestAddingSessions(t *testing.T) { ...@@ -137,7 +137,7 @@ func TestAddingSessions(t *testing.T) {
thirdSession.id != secondSession.id+2 { thirdSession.id != secondSession.id+2 {
t.Fatal("session does not have correct id set") t.Fatal("session does not have correct id set")
} }
sm.ReceiveBlocksFrom(p, []cid.Cid{block.Cid()}) sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
if len(firstSession.ks) == 0 || if len(firstSession.ks) == 0 ||
len(secondSession.ks) == 0 || len(secondSession.ks) == 0 ||
len(thirdSession.ks) == 0 { len(thirdSession.ks) == 0 {
...@@ -167,7 +167,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) { ...@@ -167,7 +167,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
nextInterestedIn = []cid.Cid{} nextInterestedIn = []cid.Cid{}
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sm.ReceiveBlocksFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()}) sm.ReceiveFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()})
if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) || if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) ||
!cmpSessionCids(secondSession, []cid.Cid{cids[0]}) || !cmpSessionCids(secondSession, []cid.Cid{cids[0]}) ||
...@@ -194,7 +194,7 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) { ...@@ -194,7 +194,7 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
cancel() cancel()
// wait for sessions to get removed // wait for sessions to get removed
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
sm.ReceiveBlocksFrom(p, []cid.Cid{block.Cid()}) sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
if len(firstSession.ks) > 0 || if len(firstSession.ks) > 0 ||
len(secondSession.ks) > 0 || len(secondSession.ks) > 0 ||
len(thirdSession.ks) > 0 { len(thirdSession.ks) > 0 {
...@@ -222,7 +222,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) { ...@@ -222,7 +222,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
sessionCancel() sessionCancel()
// wait for sessions to get removed // wait for sessions to get removed
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
sm.ReceiveBlocksFrom(p, []cid.Cid{block.Cid()}) sm.ReceiveFrom(p, []cid.Cid{block.Cid()})
if len(firstSession.ks) == 0 || if len(firstSession.ks) == 0 ||
len(secondSession.ks) > 0 || len(secondSession.ks) > 0 ||
len(thirdSession.ks) == 0 { len(thirdSession.ks) == 0 {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment