Commit 2dce5f9d authored by hannahhoward's avatar hannahhoward

fix(asyncloader): more flexible cancellation

Continue to load links synchronously after network request completes
parent 77a15c1b
...@@ -179,12 +179,6 @@ func (abl *AsyncLoader) messageQueueWorker() { ...@@ -179,12 +179,6 @@ func (abl *AsyncLoader) messageQueueWorker() {
} }
func (lr *loadRequest) handle(abl *AsyncLoader) { func (lr *loadRequest) handle(abl *AsyncLoader) {
_, ok := abl.activeRequests[lr.requestID]
if !ok {
abl.terminateWithError("No active request", lr.resultChan)
returnLoadRequest(lr)
return
}
response, err := abl.loadAttempter(lr.requestID, lr.link) response, err := abl.loadAttempter(lr.requestID, lr.link)
if err != nil { if err != nil {
lr.resultChan <- AsyncLoadResult{nil, err} lr.resultChan <- AsyncLoadResult{nil, err}
...@@ -198,6 +192,12 @@ func (lr *loadRequest) handle(abl *AsyncLoader) { ...@@ -198,6 +192,12 @@ func (lr *loadRequest) handle(abl *AsyncLoader) {
returnLoadRequest(lr) returnLoadRequest(lr)
return return
} }
_, ok := abl.activeRequests[lr.requestID]
if !ok {
abl.terminateWithError("No active request", lr.resultChan)
returnLoadRequest(lr)
return
}
abl.pausedRequests = append(abl.pausedRequests, lr) abl.pausedRequests = append(abl.pausedRequests, lr)
} }
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
ipld "github.com/ipld/go-ipld-prime" ipld "github.com/ipld/go-ipld-prime"
) )
func TestAsyncLoadWhenRequestNotInProgress(t *testing.T) { func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel() defer cancel()
...@@ -31,46 +31,44 @@ func TestAsyncLoadWhenRequestNotInProgress(t *testing.T) { ...@@ -31,46 +31,44 @@ func TestAsyncLoadWhenRequestNotInProgress(t *testing.T) {
select { select {
case result := <-resultChan: case result := <-resultChan:
if result.Data != nil { if result.Data == nil {
t.Fatal("should not have sent responses") t.Fatal("should have sent a response")
} }
if result.Err == nil { if result.Err != nil {
t.Fatal("should have sent an error") t.Fatal("should not have sent an error")
} }
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("should have produced result") t.Fatal("should have closed response channel")
} }
if callCount > 0 { if callCount == 0 {
t.Fatal("should not have attempted to load link but did") t.Fatal("should have attempted to load link but did not")
} }
} }
func TestAsyncLoadWhenInitialLoadSucceeds(t *testing.T) { func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel() defer cancel()
callCount := 0 callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
callCount++ callCount++
return testutil.RandomBytes(100), nil return nil, fmt.Errorf("something went wrong")
} }
asyncLoader := New(ctx, loadAttempter) asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup() asyncLoader.Startup()
link := testbridge.NewMockLink() link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31()) requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
resultChan := asyncLoader.AsyncLoad(requestID, link) resultChan := asyncLoader.AsyncLoad(requestID, link)
select { select {
case result := <-resultChan: case result := <-resultChan:
if result.Data == nil { if result.Data != nil {
t.Fatal("should have sent a response") t.Fatal("should not have sent responses")
} }
if result.Err != nil { if result.Err == nil {
t.Fatal("should not have sent an error") t.Fatal("should have sent an error")
} }
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("should have closed response channel") t.Fatal("should have closed response channel")
...@@ -79,23 +77,26 @@ func TestAsyncLoadWhenInitialLoadSucceeds(t *testing.T) { ...@@ -79,23 +77,26 @@ func TestAsyncLoadWhenInitialLoadSucceeds(t *testing.T) {
if callCount == 0 { if callCount == 0 {
t.Fatal("should have attempted to load link but did not") t.Fatal("should have attempted to load link but did not")
} }
}
func TestAsyncLoadInitialLoadFails(t *testing.T) { }
func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel() defer cancel()
callCount := 0 callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++ callCount++
return nil, fmt.Errorf("something went wrong") return result, nil
} }
asyncLoader := New(ctx, loadAttempter) asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup() asyncLoader.Startup()
link := testbridge.NewMockLink() link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31()) requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
resultChan := asyncLoader.AsyncLoad(requestID, link) resultChan := asyncLoader.AsyncLoad(requestID, link)
select { select {
...@@ -105,13 +106,14 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { ...@@ -105,13 +106,14 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
} }
if result.Err == nil { if result.Err == nil {
t.Fatal("should have sent an error") t.Fatal("should have sent an error")
} }
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("should have closed response channel") t.Fatal("should have produced result")
} }
if callCount == 0 { if callCount > 1 {
t.Fatal("should have attempted to load link but did not") t.Fatal("should have failed after load with indeterminate result")
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment