Unverified Commit 331d3c70 authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Fix/log blockstore reads (#169)

* log gs traversal

* Apply suggestions from code review
Co-authored-by: default avatardirkmc <dirkmdev@gmail.com>

* add debug logs

* fixed logging

* Apply suggestions from code review
Co-authored-by: default avatardirkmc <dirkmdev@gmail.com>

* fixed error
Co-authored-by: default avatardirkmc <dirkmdev@gmail.com>
parent a9ec9554
This diff is collapsed.
...@@ -41,6 +41,8 @@ type Traverser interface { ...@@ -41,6 +41,8 @@ type Traverser interface {
Error(err error) Error(err error)
// Shutdown cancels the traversal // Shutdown cancels the traversal
Shutdown(ctx context.Context) Shutdown(ctx context.Context)
// NBlocksTraversed returns the number of blocks successfully traversed
NBlocksTraversed() int
} }
type state struct { type state struct {
...@@ -60,6 +62,7 @@ type nextResponse struct { ...@@ -60,6 +62,7 @@ type nextResponse struct {
func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser { func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
ctx, cancel := context.WithCancel(parentCtx) ctx, cancel := context.WithCancel(parentCtx)
t := &traverser{ t := &traverser{
blocksCount: 0,
parentCtx: parentCtx, parentCtx: parentCtx,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
...@@ -85,6 +88,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser { ...@@ -85,6 +88,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
// traverser is a class to perform a selector traversal that stops every time a new block is loaded // traverser is a class to perform a selector traversal that stops every time a new block is loaded
// and waits for manual input (in the form of advance or error) // and waits for manual input (in the form of advance or error)
type traverser struct { type traverser struct {
blocksCount int
parentCtx context.Context parentCtx context.Context
ctx context.Context ctx context.Context
cancel func() cancel func()
...@@ -102,6 +106,10 @@ type traverser struct { ...@@ -102,6 +106,10 @@ type traverser struct {
stopped chan struct{} stopped chan struct{}
} }
func (t *traverser) NBlocksTraversed() int {
return t.blocksCount
}
func (t *traverser) checkState() { func (t *traverser) checkState() {
select { select {
case <-t.awaitRequest: case <-t.awaitRequest:
...@@ -203,16 +211,20 @@ func (t *traverser) Advance(reader io.Reader) error { ...@@ -203,16 +211,20 @@ func (t *traverser) Advance(reader io.Reader) error {
if isComplete { if isComplete {
return errors.New("cannot advance when done") return errors.New("cannot advance when done")
} }
select { select {
case <-t.ctx.Done(): case <-t.ctx.Done():
return ContextCancelError{} return ContextCancelError{}
case t.awaitRequest <- struct{}{}: case t.awaitRequest <- struct{}{}:
} }
select { select {
case <-t.ctx.Done(): case <-t.ctx.Done():
return ContextCancelError{} return ContextCancelError{}
case t.responses <- nextResponse{reader, nil}: case t.responses <- nextResponse{reader, nil}:
} }
t.blocksCount++
return nil return nil
} }
......
...@@ -24,15 +24,14 @@ func RunTraversal( ...@@ -24,15 +24,14 @@ func RunTraversal(
loader ipld.Loader, loader ipld.Loader,
traverser ipldutil.Traverser, traverser ipldutil.Traverser,
sendResponse ResponseSender) error { sendResponse ResponseSender) error {
nBlocksRead := 0
for { for {
isComplete, err := traverser.IsComplete() isComplete, err := traverser.IsComplete()
if isComplete { if isComplete {
if err != nil { if err != nil {
logger.Infof("traversal completion check failed, nBlocksRead=%d, err=%s", nBlocksRead, err) logger.Errorf("traversal completion check failed, nBlocksRead=%d, err=%s", traverser.NBlocksTraversed(), err)
} else { } else {
logger.Infof("traversal completed successfully, nBlocksRead=%d", nBlocksRead) logger.Debugf("traversal completed successfully, nBlocksRead=%d", traverser.NBlocksTraversed())
} }
return err return err
} }
...@@ -41,7 +40,7 @@ func RunTraversal( ...@@ -41,7 +40,7 @@ func RunTraversal(
result, err := loader(lnk, lnkCtx) result, err := loader(lnk, lnkCtx)
var data []byte var data []byte
if err != nil { if err != nil {
logger.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, nBlocksRead, err) logger.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err)
traverser.Error(traversal.SkipMe{}) traverser.Error(traversal.SkipMe{})
} else { } else {
blockBuffer, ok := result.(*bytes.Buffer) blockBuffer, ok := result.(*bytes.Buffer)
...@@ -50,18 +49,17 @@ func RunTraversal( ...@@ -50,18 +49,17 @@ func RunTraversal(
_, err = io.Copy(blockBuffer, result) _, err = io.Copy(blockBuffer, result)
} }
if err != nil { if err != nil {
logger.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, nBlocksRead, err) logger.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err)
traverser.Error(err) traverser.Error(err)
} else { } else {
data = blockBuffer.Bytes() data = blockBuffer.Bytes()
err = traverser.Advance(blockBuffer) err = traverser.Advance(blockBuffer)
if err != nil { if err != nil {
logger.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, nBlocksRead, err) logger.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, traverser.NBlocksTraversed(), err)
return err return err
} }
} }
nBlocksRead++ logger.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, traverser.NBlocksTraversed())
logger.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, nBlocksRead)
} }
err = sendResponse(lnk, data) err = sendResponse(lnk, data)
if err != nil { if err != nil {
......
...@@ -69,6 +69,10 @@ type fakeTraverser struct { ...@@ -69,6 +69,10 @@ type fakeTraverser struct {
expectedOutcomes []traverseOutcome expectedOutcomes []traverseOutcome
} }
func (ft *fakeTraverser) NBlocksTraversed() int {
return 0
}
// IsComplete returns the completion state (boolean) and if so, the final error result from IPLD // IsComplete returns the completion state (boolean) and if so, the final error result from IPLD
func (ft *fakeTraverser) IsComplete() (bool, error) { func (ft *fakeTraverser) IsComplete() (bool, error) {
if ft.currentLink >= len(ft.loadedLinks) { if ft.currentLink >= len(ft.loadedLinks) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment