Skip to content

Commit

Permalink
Improve error handling for FHIR Store batch upload option.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 549723066
  • Loading branch information
Kai-Bailey authored and copybara-github committed Jul 20, 2023
1 parent 8b3659d commit 816853d
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/bulk_fhir_fetch/bulk_fhir_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var (
fhirStoreGCPLocation = flag.String("fhir_store_gcp_location", "", "The GCP location of the FHIR Store.")
fhirStoreGCPDatasetID = flag.String("fhir_store_gcp_dataset_id", "", "The dataset ID for the FHIR Store.")
fhirStoreID = flag.String("fhir_store_id", "", "The FHIR Store ID.")
fhirStoreUploadErrorFileDir = flag.String("fhir_store_upload_error_file_dir", "", "An optional path to a directory where an upload errors file should be written. This file will contain the FHIR NDJSON and error information of FHIR resources that fail to upload to FHIR store.")
fhirStoreUploadErrorFileDir = flag.String("fhir_store_upload_error_file_dir", "", "An optional path to a directory where an upload errors file should be written. This file will contain the FHIR NDJSON and error information of FHIR resources that fail to upload to FHIR store. If using the batch upload option, if one or more FHIR resources in the bundle failed to upload then all FHIR resources in the bundle (including those that were sucessfully uploaded) will be written to error file.")
fhirStoreEnableBatchUpload = flag.Bool("fhir_store_enable_batch_upload", false, "If true, uploads FHIR resources to FHIR Store in batch bundles.")
fhirStoreBatchUploadSize = flag.Int("fhir_store_batch_upload_size", 0, "If set, this is the batch size used to upload FHIR batch bundles to FHIR store. If this flag is not set and fhir_store_enable_batch_upload is true, a default batch size is used.")

Expand Down
53 changes: 52 additions & 1 deletion fhir/processing/fhirstoresink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,37 @@ func TestDirectFHIRStoreSink_BatchDefaultBatchSize(t *testing.T) {
if len(gotBundle.Entry) != 5 {
t.Errorf("unexpected batch size. got: %v, want: %v", len(gotBundle.Entry), 5)
}

w.WriteHeader(200)
w.Write([]byte(`{
"entry": [
{
"response": {
"status": "201 Created"
}
},
{
"response": {
"status": "201 Created"
}
},
{
"response": {
"status": "201 Created"
}
},
{
"response": {
"status": "201 Created"
}
},
{
"response": {
"status": "201 Created"
}
}
]
}`))
}))
numWorkers := 1

Expand Down Expand Up @@ -376,8 +407,23 @@ func TestDirectFHIRStoreSink_BatchErrors(t *testing.T) {
outputPrefix = t.TempDir()
}

body := []byte(`{
"entry": [
{
"response": {
"status": "201 Created"
}
},
{
"response": {
"status": "201 Created"
}
}
]
}`)
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(500)
w.Write([]byte(body))
}))
defer testServer.Close()

Expand Down Expand Up @@ -416,7 +462,12 @@ func TestDirectFHIRStoreSink_BatchErrors(t *testing.T) {
if tc.setFHIRErrorFileDir {
expectedErrors := make([]testhelpers.ErrorNDJSONLine, len(resources))
for i, r := range resources {
expectedErrors[i] = testhelpers.ErrorNDJSONLine{Err: "error from API server: status 500 500 Internal Server Error: ", FHIRResource: string(r.Data)}
expectedError := fhirstore.BundleError{
ResponseStatusCode: 500,
ResponseStatusText: "500 Internal Server Error",
ResponseBytes: body,
}
expectedErrors[i] = testhelpers.ErrorNDJSONLine{Err: expectedError.Error(), FHIRResource: string(r.Data)}
}
testhelpers.CheckErrorNDJSONFile(t, outputPrefix, expectedErrors)
}
Expand Down
108 changes: 78 additions & 30 deletions fhirstore/fhirstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"

healthcare "google.golang.org/api/healthcare/v1"
"google.golang.org/api/option"
log "github.com/google/medical_claims_tools/internal/logger"
"github.com/google/medical_claims_tools/internal/metrics/aggregation"
"github.com/google/medical_claims_tools/internal/metrics"
)

var fhirStoreUploadCounter *metrics.Counter = metrics.NewCounter("fhir-store-upload-counter", "Count of uploads to FHIR Store by FHIR Resource Type and HTTP Status.", "1", aggregation.Count, "FHIRResourceType", "HTTPStatus")
var fhirStoreBatchUploadCounter *metrics.Counter = metrics.NewCounter("fhir-store-batch-upload-counter", "Count of FHIR Bundles uploaded to FHIR Store by HTTP Status.", "1", aggregation.Count, "HTTPStatus")
var fhirStoreBatchUploadCounter *metrics.Counter = metrics.NewCounter("fhir-store-batch-upload-counter", "Count of FHIR Bundles uploaded to FHIR Store by HTTP Status. Even if the bundle succeeds FHIR resources in the bundle may fail. See fhir-store-batch-upload-resource-counter for status of individual FHIR resources.", "1", aggregation.Count, "HTTPStatus")
var fhirStoreBatchUploadResourceCounter *metrics.Counter = metrics.NewCounter("fhir-store-batch-upload-resource-counter", "Unpacks the FHIR Bundles Response and counts the individiual FHIR Resources uploaded to FHIR Store by HTTP Status.", "1", aggregation.Count, "HTTPStatus")

// DefaultHealthcareEndpoint represents the default cloud healthcare API
// endpoint. This should be passed to UploadResource, unless in a test
Expand Down Expand Up @@ -108,7 +111,9 @@ func (c *Client) UploadResource(fhirJSON []byte) error {
}
defer resp.Body.Close()

fhirStoreUploadCounter.Record(context.Background(), 1, resourceType, http.StatusText(resp.StatusCode))
if err := fhirStoreUploadCounter.Record(context.Background(), 1, resourceType, http.StatusText(resp.StatusCode)); err != nil {
return err
}

if resp.StatusCode > 299 {
respBytes, err := ioutil.ReadAll(resp.Body)
Expand Down Expand Up @@ -151,19 +156,86 @@ func (c *Client) UploadBundle(fhirBundleJSON []byte) error {
}
defer resp.Body.Close()

fhirStoreBatchUploadCounter.Record(context.Background(), 1, http.StatusText(resp.StatusCode))
if err := fhirStoreBatchUploadCounter.Record(context.Background(), 1, http.StatusText(resp.StatusCode)); err != nil {
return err
}

if resp.StatusCode > 299 {
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read response: %v", err)
}

var resps BundleResponses
err = json.Unmarshal(respBytes, &resps)
if err != nil {
return fmt.Errorf("could not unmarshal response: %v", err)
}

errInsideBundle := false
for _, r := range resps.Entry {
if err := fhirStoreBatchUploadResourceCounter.Record(context.Background(), 1, r.Response.Status); err != nil {
return err
}

// According to the FHIR spec Response.status shall start with a 3 digit HTTP code
// (https://build.fhir.org/bundle-definitions.html#Bundle.entry.response.status)
scode, err := strconv.Atoi(r.Response.Status[:3])
if err != nil {
return fmt.Errorf("could not read response: %v", err)
return err
}
if scode > 299 {
errInsideBundle = true
log.Errorf("error uploading fhir resource in bundle: %s", r.Response.Outcome)
}
}

if resp.StatusCode > 299 || errInsideBundle {
return &BundleError{ResponseStatusCode: resp.StatusCode, ResponseStatusText: resp.Status, ResponseBytes: respBytes}
}

return nil
}

// BundleResponse holds a single FHIR Bundle response from the fhirService.ExecuteBundle call.
type BundleResponse struct {
Response struct {
Status string `json:"status"`
Outcome struct {
Issue json.RawMessage `json:"issue"`
} `json:"outcome,omitempty"`
} `json:"response"`
}

// BundleResponses holds the FHIR Bundle responses from the fhirService.ExecuteBundle call.
type BundleResponses struct {
Entry []BundleResponse `json:"entry"`
}

// BundleError represents an error returned from GCP FHIR Store when attempting
// to upload a FHIR bundle. The Bundle may succeed even if FHIR resources inside
// the bundle failed to upload. In that case ResponseStatusCode and
// ResponseStatusText hold the status of the bundle while ResponseBytes may have
// details on the individual resources.
type BundleError struct {
// ResponseStatusCode and ResponseStatusText hold the status for the bundle.
// Within the bundle individual FHIR resources may have still failed to
// upload.
ResponseStatusCode int
ResponseStatusText string
ResponseBytes []byte
}

// Error returns a string version of error information.
func (b *BundleError) Error() string {
return fmt.Sprintf("error from API server, StatusCode: %d StatusText: %s Response: %s", b.ResponseStatusCode, b.ResponseStatusText, b.ResponseBytes)
}

// Is returns true if this error should be considered equivalent to the target
// error (and makes this work smoothly with errors.Is calls)
func (b *BundleError) Is(target error) bool {
return target == ErrorAPIServer
}

// ImportFromGCS triggers a long-running FHIR store import job from a
// GCS location. Note wildcards can be used in the gcsURI, for example,
// gs://BUCKET/DIRECTORY/**.ndjson imports all files with .ndjson extension
Expand Down Expand Up @@ -203,30 +275,6 @@ func (c *Client) CheckGCSImportStatus(opName string) (isDone bool, err error) {
return op.Done, nil
}

// BundleError represents an error returned from GCP FHIR Store when attempting
// to upload a FHIR bundle. BundleError holds some structured error information
// that may be of interest to the error consumer, including the error response
// bytes (that may indicate more details on what particular resources in the
// bundle had errors).
// TODO(b/225916126): try to figure out if we can detect the format of error in
// ResponseBytes and unpack that in a structured way for consumers.
type BundleError struct {
ResponseStatusCode int
ResponseStatusText string
ResponseBytes []byte
}

// Error returns a string version of error information.
func (b *BundleError) Error() string {
return fmt.Sprintf("error from API server: status %d %s: %s", b.ResponseStatusCode, b.ResponseStatusText, b.ResponseBytes)
}

// Is returns true if this error should be considered equivalent to the target
// error (and makes this work smoothly with errors.Is calls)
func (b *BundleError) Is(target error) bool {
return target == ErrorAPIServer
}

type fhirBundle struct {
ResourceType string `json:"resourceType"`
Type string `json:"type"`
Expand Down
Loading

0 comments on commit 816853d

Please sign in to comment.