Skip to content

Commit

Permalink
If GetData returns unauthorized, retry a couple of times. Needed beca…
Browse files Browse the repository at this point in the history
…use BCDA has recently has been returning unauthorized on the first retry. In the future we may need to add this multiple retry logic elsewhere as well.

PiperOrigin-RevId: 436353733
  • Loading branch information
suyashkumar authored and copybara-github committed Mar 22, 2022
1 parent c835d32 commit ceef4b0
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 6 deletions.
20 changes: 14 additions & 6 deletions cmd/bcda_fetch/bcda_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func mainWrapper(cfg mainWrapperConfig) error {
for r, urls := range jobStatus.ResultURLs {
for i, url := range urls {
filePrefix := fmt.Sprintf("%s_%s_%d", *outputPrefix, r, i)
r := getDataOrExit(cl, url, *clientID, *clientSecret)
r, err := getDataOrExit(cl, url, *clientID, *clientSecret)
if err != nil {
return err
}
defer r.Close()
if *rectify {
rectifyAndWrite(r, filePrefix, cfg)
Expand Down Expand Up @@ -196,18 +199,23 @@ func mainWrapper(cfg mainWrapperConfig) error {
return nil
}

func getDataOrExit(cl *bcda.Client, url, clientID, clientSecret string) io.ReadCloser {
func getDataOrExit(cl *bcda.Client, url, clientID, clientSecret string) (io.ReadCloser, error) {
r, err := cl.GetData(url)
if err == bcda.ErrorUnauthorized {
numRetries := 0
for errors.Is(err, bcda.ErrorUnauthorized) && numRetries < 5 {
time.Sleep(2 * time.Second)
log.Infof("Got Unauthorized from BCDA. Re-authenticating and trying again.")
if _, err := cl.Authenticate(clientID, clientSecret); err != nil {
log.Exitf("Error authenticating with API: %v", err)
return nil, fmt.Errorf("Error authenticating with API: %w", err)
}
r, err = cl.GetData(url)
numRetries++
}

if err != nil {
log.Exitf("Unable to GetData(%s): %v", url, err)
return nil, fmt.Errorf("Unable to GetData(%s) %w", url, err)
}
return r
return r, nil
}

func writeData(r io.Reader, filePrefix string) {
Expand Down
112 changes: 112 additions & 0 deletions cmd/bcda_fetch/bcda_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,118 @@ func TestMainWrapper_GetJobStatusAuthRetry(t *testing.T) {
}
}

func TestMainWrapper_GetDataAuthRetry(t *testing.T) {
// This tests that if GetData returns unauthorized, mainWrapper attempts to
// re-authorize and try again at least 5 times.
cases := []struct {
name string
apiVersion bcda.Version
numRetriesBeforeOK int
wantError error
}{
{
name: "BCDAV1",
apiVersion: bcda.V1,
numRetriesBeforeOK: 5,
},
{
name: "BCDAV2",
apiVersion: bcda.V2,
numRetriesBeforeOK: 5,
},
{
name: "BCDAV1TooManyRetries",
apiVersion: bcda.V1,
numRetriesBeforeOK: 6,
wantError: bcda.ErrorUnauthorized,
},
{
name: "BCDAV2TooManyRetries",
apiVersion: bcda.V2,
numRetriesBeforeOK: 6,
wantError: bcda.ErrorUnauthorized,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Declare test data:
file1Data := []byte(`{"resourceType":"Patient","id":"PatientID"}`)
exportEndpoint := "/api/v1/Group/all/$export"
jobsEndpoint := "/api/v1/jobs/1234"
if tc.apiVersion == bcda.V2 {
exportEndpoint = "/api/v2/Group/all/$export"
jobsEndpoint = "/api/v2/jobs/1234"
}
serverTransactionTime := "2020-12-09T11:00:00.123+00:00"

var authCalled mutexCounter
var getDataCalled mutexCounter

// Setup BCDA test servers:

// A seperate resource server is needed during testing, so that we can send
// the jobsEndpoint response in the bcdaServer that includes a URL for the
// bcdaResourceServer in it.
bcdaResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
getDataCalled.Increment()
if authCalled.Value() < tc.numRetriesBeforeOK+1 { // plus 1 because auth always called once at client init.
w.WriteHeader(http.StatusUnauthorized)
return
}
w.WriteHeader(http.StatusOK)
w.Write(file1Data)
}))
defer bcdaResourceServer.Close()

bcdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/auth/token":
authCalled.Increment()
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"access_token": "token"}`))
case exportEndpoint:
w.Header()["Content-Location"] = []string{"some/info/1234"}
w.WriteHeader(http.StatusAccepted)
case jobsEndpoint:
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("{\"output\": [{\"type\": \"Patient\", \"url\": \"%s/data/10.ndjson\"}], \"transactionTime\": \"%s\"}", bcdaResourceServer.URL, serverTransactionTime)))
default:
w.WriteHeader(http.StatusBadRequest)
}
}))
defer bcdaServer.Close()

// Set flags for this test case:
outputPrefix := t.TempDir()
defer SaveFlags().Restore()
flag.Set("client_id", "id")
flag.Set("client_secret", "secret")
flag.Set("output_prefix", outputPrefix)
flag.Set("bcda_server_url", bcdaServer.URL)

if tc.apiVersion == bcda.V2 {
flag.Set("use_v2", "true")
}

// Run mainWrapper:
cfg := mainWrapperConfig{fhirStoreEndpoint: ""}
if err := mainWrapper(cfg); !errors.Is(err, tc.wantError) {
t.Errorf("mainWrapper(%v) unexpected error. got: %v, want: %v", cfg, err, tc.wantError)
}
if tc.wantError == nil {
wantCalls := tc.numRetriesBeforeOK + 1
if got := authCalled.Value(); got != wantCalls {
t.Errorf("mainWrapper: expected auth to be called exactly %d, got: %d, want: %d", wantCalls, got, wantCalls)
}
if got := getDataCalled.Value(); got != wantCalls {
t.Errorf("mainWrapper: expected getDataCalled to be called exactly %d, got: %d, want: %d", wantCalls, got, wantCalls)
}
}
})
}
}

type fhirStoreTestResource struct {
resourceID string
resourceType string
Expand Down

0 comments on commit ceef4b0

Please sign in to comment.