package asyncloader import ( "context" "fmt" "math/rand" "testing" "time" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/testbridge" "github.com/ipfs/go-graphsync/testutil" ipld "github.com/ipld/go-ipld-prime" ) func TestAsyncLoadInitialLoadSucceeds(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() callCount := 0 loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { callCount++ return testutil.RandomBytes(100), nil } asyncLoader := New(ctx, loadAttempter) asyncLoader.Startup() link := testbridge.NewMockLink() requestID := gsmsg.GraphSyncRequestID(rand.Int31()) resultChan := asyncLoader.AsyncLoad(requestID, link) select { case result := <-resultChan: if result.Data == nil { t.Fatal("should have sent a response") } if result.Err != nil { t.Fatal("should not have sent an error") } case <-ctx.Done(): t.Fatal("should have closed response channel") } if callCount == 0 { t.Fatal("should have attempted to load link but did not") } } func TestAsyncLoadInitialLoadFails(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() callCount := 0 loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { callCount++ return nil, fmt.Errorf("something went wrong") } asyncLoader := New(ctx, loadAttempter) asyncLoader.Startup() link := testbridge.NewMockLink() requestID := gsmsg.GraphSyncRequestID(rand.Int31()) resultChan := asyncLoader.AsyncLoad(requestID, link) select { case result := <-resultChan: if result.Data != nil { t.Fatal("should not have sent responses") } if result.Err == nil { t.Fatal("should have sent an error") } case <-ctx.Done(): t.Fatal("should have closed response channel") } if callCount == 0 { t.Fatal("should have attempted to load link but did not") } } func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() callCount := 0 loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { var result []byte if callCount > 0 { result = testutil.RandomBytes(100) } callCount++ return result, nil } asyncLoader := New(ctx, loadAttempter) asyncLoader.Startup() link := testbridge.NewMockLink() requestID := gsmsg.GraphSyncRequestID(rand.Int31()) resultChan := asyncLoader.AsyncLoad(requestID, link) select { case result := <-resultChan: if result.Data != nil { t.Fatal("should not have sent responses") } if result.Err == nil { t.Fatal("should have sent an error") } case <-ctx.Done(): t.Fatal("should have produced result") } if callCount > 1 { t.Fatal("should have failed after load with indeterminate result") } } func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() callCount := 0 called := make(chan struct{}, 2) loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { var result []byte called <- struct{}{} if callCount > 0 { result = testutil.RandomBytes(100) } callCount++ return result, nil } asyncLoader := New(ctx, loadAttempter) asyncLoader.Startup() link := testbridge.NewMockLink() requestID := gsmsg.GraphSyncRequestID(rand.Int31()) asyncLoader.StartRequest(requestID) resultChan := asyncLoader.AsyncLoad(requestID, link) select { case <-called: case <-resultChan: t.Fatal("Should not have sent message on response chan") case <-ctx.Done(): t.Fatal("should have attempted load once") } asyncLoader.NewResponsesAvailable() select { case result := <-resultChan: if result.Data == nil { t.Fatal("should have sent a response") } if result.Err != nil { t.Fatal("should not have sent an error") } case <-ctx.Done(): t.Fatal("should have closed response channel") } if callCount < 2 { t.Fatal("should have attempted to load multiple times till success but did not") } } func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() callCount := 0 called := make(chan struct{}, 2) loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) { var result []byte called <- struct{}{} if callCount > 0 { result = testutil.RandomBytes(100) } callCount++ return result, nil } asyncLoader := New(ctx, loadAttempter) asyncLoader.Startup() link := testbridge.NewMockLink() requestID := gsmsg.GraphSyncRequestID(rand.Int31()) asyncLoader.StartRequest(requestID) resultChan := asyncLoader.AsyncLoad(requestID, link) select { case <-called: case <-resultChan: t.Fatal("Should not have sent message on response chan") case <-ctx.Done(): t.Fatal("should have attempted load once") } asyncLoader.FinishRequest(requestID) asyncLoader.NewResponsesAvailable() select { case result := <-resultChan: if result.Data != nil { t.Fatal("should not have sent responses") } if result.Err == nil { t.Fatal("should have sent an error") } case <-ctx.Done(): t.Fatal("should have closed response channel") } if callCount > 1 { t.Fatal("should only have attempted one call but attempted multiple") } }