Skip to content

Commit a66ff23

Browse files
committed
Retry Requests on Transient Errors
Closes: #32
1 parent 7515be9 commit a66ff23

File tree

2 files changed

+336
-36
lines changed

2 files changed

+336
-36
lines changed

cmd/evaluateMeasure.go

Lines changed: 85 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,67 @@ func RandomUrl() (string, error) {
188188
return "urn:uuid:" + myUuid.String(), nil
189189
}
190190

191-
func evaluateMeasure(measureUrl string) ([]byte, error) {
191+
func isTransient(issue fm.OperationOutcomeIssue) bool {
192+
switch issue.Code {
193+
case fm.IssueTypeTransient,
194+
fm.IssueTypeLockError,
195+
fm.IssueTypeNoStore,
196+
fm.IssueTypeException,
197+
fm.IssueTypeTimeout,
198+
fm.IssueTypeIncomplete,
199+
fm.IssueTypeThrottled:
200+
return true
201+
default:
202+
return false
203+
}
204+
}
205+
206+
type operationOutcomeError struct {
207+
outcome *fm.OperationOutcome
208+
}
209+
210+
func (err *operationOutcomeError) Error() string {
211+
return util.FmtOperationOutcomes([]*fm.OperationOutcome{err.outcome})
212+
}
213+
214+
type retryableError interface {
215+
retryable() bool
216+
}
217+
218+
func (err *operationOutcomeError) retryable() bool {
219+
for _, issue := range err.outcome.Issue {
220+
if isTransient(issue) {
221+
return true
222+
}
223+
}
224+
return false
225+
}
226+
227+
func isRetryable(err error) bool {
228+
if re, ok := err.(retryableError); ok {
229+
return re.retryable()
230+
}
231+
return false
232+
}
233+
234+
func handleErrorResponse(measureUrl string, resp *http.Response) ([]byte, error) {
235+
body, err := io.ReadAll(resp.Body)
236+
if err != nil {
237+
return nil, err
238+
}
239+
240+
operationOutcome := fm.OperationOutcome{}
241+
242+
err = json.Unmarshal(body, &operationOutcome)
243+
if err == nil {
244+
err = &operationOutcomeError{outcome: &operationOutcome}
245+
}
246+
247+
return nil, fmt.Errorf("Error while evaluating the measure with canonical URL %s:\n\n%w",
248+
measureUrl, err)
249+
}
250+
251+
func evaluateMeasure(client *fhir.Client, measureUrl string) ([]byte, error) {
192252
req, err := client.NewTypeOperationRequest("Measure", "evaluate-measure", !forceSync,
193253
url.Values{
194254
"measure": []string{measureUrl},
@@ -216,26 +276,14 @@ func evaluateMeasure(measureUrl string) ([]byte, error) {
216276
contentLocation := resp.Header.Get("Content-Location")
217277
interruptChan := make(chan os.Signal, 1)
218278
signal.Notify(interruptChan, os.Interrupt)
219-
return pollAsyncStatus(measureUrl, contentLocation, 100*time.Millisecond, interruptChan)
279+
return pollAsyncStatus(client, measureUrl, contentLocation, 100*time.Millisecond, interruptChan)
220280
} else {
221-
body, err := io.ReadAll(resp.Body)
222-
if err != nil {
223-
return nil, err
224-
}
225-
226-
operationOutcome := fm.OperationOutcome{}
227-
228-
err = json.Unmarshal(body, &operationOutcome)
229-
if err != nil {
230-
return nil, err
231-
}
232-
233-
return nil, fmt.Errorf("Error while evaluating the measure with canonical URL %s:\n\n%s",
234-
measureUrl, util.FmtOperationOutcomes([]*fm.OperationOutcome{&operationOutcome}))
281+
return handleErrorResponse(measureUrl, resp)
235282
}
236283
}
237284

238-
func pollAsyncStatus(measureUrl string, location string, wait time.Duration, interruptChan chan os.Signal) ([]byte, error) {
285+
func pollAsyncStatus(client *fhir.Client, measureUrl string, location string, wait time.Duration,
286+
interruptChan chan os.Signal) ([]byte, error) {
239287
select {
240288
case <-interruptChan:
241289
fmt.Fprintf(os.Stderr, "Cancel async request...\n")
@@ -266,8 +314,8 @@ func pollAsyncStatus(measureUrl string, location string, wait time.Duration, int
266314
return nil, err
267315
}
268316

269-
return nil, fmt.Errorf("Error while cancelling the async request at status endpoint %s:\n\n%s",
270-
location, util.FmtOperationOutcomes([]*fm.OperationOutcome{&operationOutcome}))
317+
return nil, fmt.Errorf("Error while cancelling the async request at status endpoint %s:\n\n%w",
318+
location, &operationOutcomeError{outcome: &operationOutcome})
271319
}
272320
case <-time.After(wait):
273321
fmt.Fprintf(os.Stderr, "Poll status endpoint at %s...\n", location)
@@ -285,37 +333,38 @@ func pollAsyncStatus(measureUrl string, location string, wait time.Duration, int
285333
if resp.StatusCode == 200 {
286334
batchResponse, err := fhir.ReadBundle(resp.Body)
287335
if err != nil {
288-
return nil, fmt.Errorf("error while reading the async response Bundle: %v", err)
336+
return nil, fmt.Errorf("error while reading the async response Bundle: %w", err)
289337
}
290338

291339
if len(batchResponse.Entry) != 1 {
292340
return nil, fmt.Errorf("expected one entry in async response Bundle but was %d entries", len(batchResponse.Entry))
293341
}
294342

295-
return json.Marshal(batchResponse.Entry[0].Resource)
343+
return batchResponse.Entry[0].Resource, nil
296344
} else if resp.StatusCode == 202 {
297345
// exponential wait up to 10 seconds
298346
if wait < 10*time.Second {
299347
wait *= 2
300348
}
301-
return pollAsyncStatus(measureUrl, location, wait, interruptChan)
349+
return pollAsyncStatus(client, measureUrl, location, wait, interruptChan)
302350
} else {
303-
body, err := io.ReadAll(resp.Body)
304-
if err != nil {
305-
return nil, err
306-
}
307-
308-
operationOutcome := fm.OperationOutcome{}
309-
310-
err = json.Unmarshal(body, &operationOutcome)
311-
if err != nil {
312-
return nil, err
313-
}
351+
return handleErrorResponse(measureUrl, resp)
352+
}
353+
}
354+
}
314355

315-
return nil, fmt.Errorf("Error while evaluating the measure with canonical URL %s:\n\n%s",
316-
measureUrl, util.FmtOperationOutcomes([]*fm.OperationOutcome{&operationOutcome}))
356+
func evaluateMeasureWithRetry(client *fhir.Client, measureUrl string) ([]byte, error) {
357+
var lastErr error
358+
for wait := 100 * time.Millisecond; wait < 5*time.Second; wait *= 2 {
359+
measureReport, err := evaluateMeasure(client, measureUrl)
360+
lastErr = err
361+
if !isRetryable(errors.Unwrap(err)) {
362+
return measureReport, err
317363
}
364+
fmt.Fprintf(os.Stderr, "Retry evaluating the measure...\n")
365+
<-time.After(wait)
318366
}
367+
return nil, lastErr
319368
}
320369

321370
var evaluateMeasureCmd = &cobra.Command{
@@ -433,7 +482,7 @@ See: https://github.com/samply/blaze/blob/master/docs/cql-queries/blazectl.md`,
433482

434483
fmt.Fprintf(os.Stderr, "Evaluate measure with canonical URL %s on %s ...\n\n", measureUrl, server)
435484

436-
measureReport, err := evaluateMeasure(measureUrl)
485+
measureReport, err := evaluateMeasureWithRetry(client, measureUrl)
437486
if err != nil {
438487
fmt.Println(err)
439488
os.Exit(1)

0 commit comments

Comments
 (0)