Commit fe1cb2c9 authored by hannahhoward's avatar hannahhoward

refactor(linktracker): move to top level package

move link tracker out of response manager so it can be used for requests
parent 4e0fea7b
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
// Second, keep track of whether links are missing blocks so you can determine // Second, keep track of whether links are missing blocks so you can determine
// at the end if a complete response has been transmitted. // at the end if a complete response has been transmitted.
type LinkTracker struct { type LinkTracker struct {
isMissingBlocks map[gsmsg.GraphSyncRequestID]struct{} missingBlocks map[gsmsg.GraphSyncRequestID]map[ipld.Link]struct{}
linksWithBlocksTraversedByRequest map[gsmsg.GraphSyncRequestID][]ipld.Link linksWithBlocksTraversedByRequest map[gsmsg.GraphSyncRequestID][]ipld.Link
traversalsWithBlocksInProgress map[ipld.Link]int traversalsWithBlocksInProgress map[ipld.Link]int
} }
...@@ -19,17 +19,26 @@ type LinkTracker struct { ...@@ -19,17 +19,26 @@ type LinkTracker struct {
// New makes a new link tracker // New makes a new link tracker
func New() *LinkTracker { func New() *LinkTracker {
return &LinkTracker{ return &LinkTracker{
isMissingBlocks: make(map[gsmsg.GraphSyncRequestID]struct{}), missingBlocks: make(map[gsmsg.GraphSyncRequestID]map[ipld.Link]struct{}),
linksWithBlocksTraversedByRequest: make(map[gsmsg.GraphSyncRequestID][]ipld.Link), linksWithBlocksTraversedByRequest: make(map[gsmsg.GraphSyncRequestID][]ipld.Link),
traversalsWithBlocksInProgress: make(map[ipld.Link]int), traversalsWithBlocksInProgress: make(map[ipld.Link]int),
} }
} }
// ShouldSendBlockFor says whether we should send a block for a given link, based // BlockRefCount returns the number of times a present block has been traversed
// on whether we have traversed it already in one of the in progress requests and // by in progress requests
// sent a block already. func (lt *LinkTracker) BlockRefCount(link ipld.Link) int {
func (lt *LinkTracker) ShouldSendBlockFor(link ipld.Link) bool { return lt.traversalsWithBlocksInProgress[link]
return lt.traversalsWithBlocksInProgress[link] == 0 }
// IsKnownMissingLink returns whether the given request recorded the given link as missing
func (lt *LinkTracker) IsKnownMissingLink(requestID gsmsg.GraphSyncRequestID, link ipld.Link) bool {
missingBlocks, ok := lt.missingBlocks[requestID]
if !ok {
return false
}
_, ok = missingBlocks[link]
return ok
} }
// RecordLinkTraversal records that we traversed a link during a request, and // RecordLinkTraversal records that we traversed a link during a request, and
...@@ -39,15 +48,21 @@ func (lt *LinkTracker) RecordLinkTraversal(requestID gsmsg.GraphSyncRequestID, l ...@@ -39,15 +48,21 @@ func (lt *LinkTracker) RecordLinkTraversal(requestID gsmsg.GraphSyncRequestID, l
lt.linksWithBlocksTraversedByRequest[requestID] = append(lt.linksWithBlocksTraversedByRequest[requestID], link) lt.linksWithBlocksTraversedByRequest[requestID] = append(lt.linksWithBlocksTraversedByRequest[requestID], link)
lt.traversalsWithBlocksInProgress[link]++ lt.traversalsWithBlocksInProgress[link]++
} else { } else {
lt.isMissingBlocks[requestID] = struct{}{} missingBlocks, ok := lt.missingBlocks[requestID]
if !ok {
missingBlocks = make(map[ipld.Link]struct{})
lt.missingBlocks[requestID] = missingBlocks
}
missingBlocks[link] = struct{}{}
} }
} }
// FinishRequest records that we have completed the given request, and returns // FinishRequest records that we have completed the given request, and returns
// true if all links traversed had blocks present. // true if all links traversed had blocks present.
func (lt *LinkTracker) FinishRequest(requestID gsmsg.GraphSyncRequestID) (hasAllBlocks bool) { func (lt *LinkTracker) FinishRequest(requestID gsmsg.GraphSyncRequestID) (hasAllBlocks bool) {
_, ok := lt.isMissingBlocks[requestID] _, ok := lt.missingBlocks[requestID]
hasAllBlocks = !ok hasAllBlocks = !ok
delete(lt.missingBlocks, requestID)
links, ok := lt.linksWithBlocksTraversedByRequest[requestID] links, ok := lt.linksWithBlocksTraversedByRequest[requestID]
if !ok { if !ok {
return return
......
...@@ -8,12 +8,12 @@ import ( ...@@ -8,12 +8,12 @@ import (
"github.com/ipfs/go-graphsync/testbridge" "github.com/ipfs/go-graphsync/testbridge"
) )
func TestShouldSendBlocks(t *testing.T) { func TestBlockRefCount(t *testing.T) {
linkTracker := New() linkTracker := New()
link1 := testbridge.NewMockLink() link1 := testbridge.NewMockLink()
link2 := testbridge.NewMockLink() link2 := testbridge.NewMockLink()
if !linkTracker.ShouldSendBlockFor(link1) || !linkTracker.ShouldSendBlockFor(link2) { if linkTracker.BlockRefCount(link1) != 0 || linkTracker.BlockRefCount(link2) != 0 {
t.Fatal("Links not traversed should send blocks") t.Fatal("Links not traversed should have refcount 0")
} }
requestID1 := gsmsg.GraphSyncRequestID(rand.Int31()) requestID1 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID2 := gsmsg.GraphSyncRequestID(rand.Int31()) requestID2 := gsmsg.GraphSyncRequestID(rand.Int31())
...@@ -22,13 +22,13 @@ func TestShouldSendBlocks(t *testing.T) { ...@@ -22,13 +22,13 @@ func TestShouldSendBlocks(t *testing.T) {
linkTracker.RecordLinkTraversal(requestID1, link2, true) linkTracker.RecordLinkTraversal(requestID1, link2, true)
linkTracker.RecordLinkTraversal(requestID2, link1, true) linkTracker.RecordLinkTraversal(requestID2, link1, true)
if linkTracker.ShouldSendBlockFor(link1) || linkTracker.ShouldSendBlockFor(link2) { if linkTracker.BlockRefCount(link1) == 0 || linkTracker.BlockRefCount(link2) == 0 {
t.Fatal("Links already traversed with blocks should not send blocks again") t.Fatal("Links already traversed with blocks should not have ref count 0")
} }
linkTracker.FinishRequest(requestID1) linkTracker.FinishRequest(requestID1)
if linkTracker.ShouldSendBlockFor(link1) || !linkTracker.ShouldSendBlockFor(link2) { if linkTracker.BlockRefCount(link1) == 0 || linkTracker.BlockRefCount(link2) != 0 {
t.Fatal("Finishing request should resend blocks only if there are no in progress requests for that block remain") t.Fatal("Finishing request decrement refcount for block traversed by request")
} }
} }
...@@ -53,7 +53,7 @@ func TestHasAllBlocks(t *testing.T) { ...@@ -53,7 +53,7 @@ func TestHasAllBlocks(t *testing.T) {
func TestBlockBecomesAvailable(t *testing.T) { func TestBlockBecomesAvailable(t *testing.T) {
linkTracker := New() linkTracker := New()
link1 := testbridge.NewMockLink() link1 := testbridge.NewMockLink()
if !linkTracker.ShouldSendBlockFor(link1) { if linkTracker.BlockRefCount(link1) != 0 {
t.Fatal("Links not traversed should send blocks") t.Fatal("Links not traversed should send blocks")
} }
requestID1 := gsmsg.GraphSyncRequestID(rand.Int31()) requestID1 := gsmsg.GraphSyncRequestID(rand.Int31())
...@@ -62,13 +62,12 @@ func TestBlockBecomesAvailable(t *testing.T) { ...@@ -62,13 +62,12 @@ func TestBlockBecomesAvailable(t *testing.T) {
linkTracker.RecordLinkTraversal(requestID1, link1, false) linkTracker.RecordLinkTraversal(requestID1, link1, false)
linkTracker.RecordLinkTraversal(requestID2, link1, false) linkTracker.RecordLinkTraversal(requestID2, link1, false)
if !linkTracker.ShouldSendBlockFor(link1) { if linkTracker.BlockRefCount(link1) != 0 {
t.Fatal("Links traversed without blocks should still send them if they become availabe") t.Fatal("Links traversed without blocks should still send them if they become availabe")
} }
linkTracker.RecordLinkTraversal(requestID1, link1, true) linkTracker.RecordLinkTraversal(requestID1, link1, true)
if linkTracker.BlockRefCount(link1) == 0 {
if linkTracker.ShouldSendBlockFor(link1) {
t.Fatal("Links traversed with blocks should no longer send") t.Fatal("Links traversed with blocks should no longer send")
} }
...@@ -77,7 +76,26 @@ func TestBlockBecomesAvailable(t *testing.T) { ...@@ -77,7 +76,26 @@ func TestBlockBecomesAvailable(t *testing.T) {
t.Fatal("Even if block becomes available, traversal may be incomplete, request still should not be considered to have all blocks") t.Fatal("Even if block becomes available, traversal may be incomplete, request still should not be considered to have all blocks")
} }
if !linkTracker.ShouldSendBlockFor(link1) { if linkTracker.BlockRefCount(link1) != 0 {
t.Fatal("Block traversals should resend for requests that never traversed while block was present") t.Fatal("Block traversals should resend for requests that never traversed while block was present")
} }
} }
func TestMissingLink(t *testing.T) {
linkTracker := New()
link1 := testbridge.NewMockLink()
link2 := testbridge.NewMockLink()
requestID1 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID2 := gsmsg.GraphSyncRequestID(rand.Int31())
linkTracker.RecordLinkTraversal(requestID1, link1, true)
linkTracker.RecordLinkTraversal(requestID1, link2, false)
linkTracker.RecordLinkTraversal(requestID2, link1, true)
if linkTracker.IsKnownMissingLink(requestID1, link1) ||
!linkTracker.IsKnownMissingLink(requestID1, link2) ||
linkTracker.IsKnownMissingLink(requestID2, link1) ||
linkTracker.IsKnownMissingLink(requestID2, link2) {
t.Fatal("Did not record which links are known missing correctly")
}
}
...@@ -14,8 +14,8 @@ import ( ...@@ -14,8 +14,8 @@ import (
"github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/linktracker"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/linktracker"
"github.com/ipfs/go-graphsync/responsemanager/responsebuilder" "github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
) )
...@@ -90,7 +90,7 @@ func (prm *peerResponseSender) SendResponse( ...@@ -90,7 +90,7 @@ func (prm *peerResponseSender) SendResponse(
) { ) {
hasBlock := data != nil hasBlock := data != nil
prm.linkTrackerLk.Lock() prm.linkTrackerLk.Lock()
sendBlock := hasBlock && prm.linkTracker.ShouldSendBlockFor(link) sendBlock := hasBlock && prm.linkTracker.BlockRefCount(link) == 0
prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock) prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prm.linkTrackerLk.Unlock() prm.linkTrackerLk.Unlock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment