Skip to content

Commit 26468a9

Browse files
committed
DatastoreExportが失敗した場合、リトライを行うようにした refs #48
1 parent c1d798a commit 26468a9

8 files changed

+196
-146
lines changed

api.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net/http"
7+
)
8+
9+
func WriteError(w http.ResponseWriter, statusCode int, message string, err error) {
10+
msg := fmt.Sprintln(message, " : ", err)
11+
log.Println(msg)
12+
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
13+
w.WriteHeader(statusCode)
14+
_, err = w.Write([]byte(msg))
15+
if err != nil {
16+
log.Println(err)
17+
}
18+
}

bqload_service_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func TestBQLoadService_InsertBigQueryLoadJob(t *testing.T) {
2323
if err != nil {
2424
t.Fatal(err)
2525
}
26+
bqljcQ, err := NewBQLoadJobCheckQueue("localhost:8080", TasksClient)
27+
if err != nil {
28+
t.Fatal(err)
29+
}
2630

2731
const ds2bqJobID = "helloJob"
2832
{
@@ -38,7 +42,7 @@ func TestBQLoadService_InsertBigQueryLoadJob(t *testing.T) {
3842
}
3943
}
4044

41-
ls := NewBQLoadService(s)
45+
ls := NewBQLoadService(s, bqljcQ)
4246
if err := ls.InsertBigQueryLoadJob(ctx, ds2bqJobID, "gs://datastore-backup-gcpugjp-dev/2019-07-25T10:35:08_16520"); err != nil {
4347
t.Fatal(err)
4448
}

datastore_export_api.go

Lines changed: 41 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/morikuni/failure"
1313
)
1414

15+
const DefaultSeparateKindCount = 30
16+
1517
type DatastoreExportRequest struct {
1618
ProjectID string `json:"projectId"`
1719
AllKinds bool `json:"allKinds"`
@@ -33,94 +35,62 @@ type DS2BQJobIDWithDatastoreExportJobID struct {
3335
DatastoreExportJobID string `json:"datastoreExportJobId"`
3436
}
3537

36-
func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
37-
queue, err := NewDatastoreExportJobCheckQueue(r.Host, TasksClient)
38-
if err != nil {
39-
msg := fmt.Sprintf("failed NewDatastoreExportJobCheckQueue.err=%+v", err)
40-
log.Println(msg)
41-
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
42-
w.WriteHeader(http.StatusBadRequest)
43-
_, err := w.Write([]byte(msg))
44-
if err != nil {
45-
log.Println(err)
46-
}
47-
return
38+
type DatastoreExportAPI struct {
39+
DatastoreExportJobCheckQueue *DatastoreExportJobCheckQueue
40+
DSExportJobStore *DSExportJobStore
41+
BQLoadJobStore *BQLoadJobStore
42+
}
43+
44+
func NewDatastoreExportAPI(queue *DatastoreExportJobCheckQueue, dseJS *DSExportJobStore, bqlJS *BQLoadJobStore) *DatastoreExportAPI {
45+
return &DatastoreExportAPI{
46+
queue, dseJS, bqlJS,
4847
}
48+
}
4949

50+
func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
5051
body, err := ioutil.ReadAll(r.Body)
5152
if err != nil {
52-
msg := fmt.Sprintf("failed ioutil.Read(request.Body).err=%+v", err)
53-
log.Println(msg)
54-
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
55-
w.WriteHeader(http.StatusBadRequest)
56-
_, err := w.Write([]byte(msg))
57-
if err != nil {
58-
log.Println(err)
59-
}
53+
WriteError(w, http.StatusBadRequest, "failed ioutil.Read(request.Body)", err)
6054
return
6155
}
6256

6357
form := &DatastoreExportRequest{}
6458
if err := json.Unmarshal(body, form); err != nil {
65-
msg := fmt.Sprintf("failed json.Unmarshal(request.Body).err=%+v", err)
66-
log.Println(msg)
67-
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
68-
w.WriteHeader(http.StatusBadRequest)
69-
_, err := w.Write([]byte(msg))
70-
if err != nil {
71-
log.Println(err)
72-
}
59+
WriteError(w, http.StatusBadRequest, fmt.Sprintf("failed json.Unmarshal(request.Body) body=%v", string(body)), err)
7360
return
7461
}
7562

7663
log.Printf("%s\n", string(body))
7764

7865
kinds, err := GetDatastoreKinds(r.Context(), form)
7966
if err != nil {
80-
msg := fmt.Sprintf("failed GetDatastoreKinds form=%+v.err=%+v", form, err)
81-
log.Println(msg)
82-
w.WriteHeader(http.StatusInternalServerError)
83-
_, err := w.Write([]byte(msg))
84-
if err != nil {
85-
log.Println(err)
86-
}
67+
WriteError(w, http.StatusBadRequest, fmt.Sprintf("failed GetDatastoreKinds form=%+v", form), err)
8768
return
8869
}
89-
efs, err := BuildEntityFilter(r.Context(), form.NamespaceIDs, kinds, 30)
70+
efs, err := BuildEntityFilter(r.Context(), form.NamespaceIDs, kinds, DefaultSeparateKindCount)
9071
if err != nil {
91-
msg := fmt.Sprintf("failed BuildEntityFilter form=%+v.err=%+v", form, err)
92-
log.Println(msg)
93-
w.WriteHeader(http.StatusInternalServerError)
94-
_, err := w.Write([]byte(msg))
95-
if err != nil {
96-
log.Println(err)
97-
}
72+
WriteError(w, http.StatusBadRequest, fmt.Sprintf("failed BuildEntityFilter form=%+v", form), err)
73+
return
74+
}
75+
76+
queue, err := NewDatastoreExportJobCheckQueue(r.Host, TasksClient)
77+
if err != nil {
78+
WriteError(w, http.StatusBadRequest, "failed NewDatastoreExportJobCheckQueue", err)
9879
return
9980
}
10081

10182
dsexportJobStore, err := NewDSExportJobStore(r.Context(), DatastoreClient)
10283
if err != nil {
103-
msg := fmt.Sprintf("failed NewDSExportJobStore() form=%+v.err=%+v", form, err)
104-
log.Println(msg)
105-
w.WriteHeader(http.StatusInternalServerError)
106-
_, err := w.Write([]byte(msg))
107-
if err != nil {
108-
log.Println(err)
109-
}
84+
WriteError(w, http.StatusInternalServerError, fmt.Sprintf("failed NewDSExportJobStore() form=%+v", form), err)
11085
return
11186
}
11287

11388
bqloadJobStore, err := NewBQLoadJobStore(r.Context(), DatastoreClient)
11489
if err != nil {
115-
msg := fmt.Sprintf("failed NewBQLoadJobStore() form=%+v.err=%+v", form, err)
116-
log.Println(msg)
117-
w.WriteHeader(http.StatusInternalServerError)
118-
_, err := w.Write([]byte(msg))
119-
if err != nil {
120-
log.Println(err)
121-
}
90+
WriteError(w, http.StatusInternalServerError, fmt.Sprintf("failed NewBQLoadJobStore() form=%+v", form), err)
12291
return
12392
}
93+
api := NewDatastoreExportAPI(queue, dsexportJobStore, bqloadJobStore)
12494

12595
res := &DatastoreExportResponse{
12696
[]*DS2BQJobIDWithDatastoreExportJobID{},
@@ -129,7 +99,7 @@ func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
12999
var dsExportJobID string
130100
ds2bqJobID := dsexportJobStore.NewDS2BQJobID(r.Context())
131101
bqLoadKinds := BuildBQLoadKinds(ef, form.IgnoreBQLoadKinds)
132-
dsExportJobID, err := CreateDatastoreExportJob(r.Context(), dsexportJobStore, bqloadJobStore, queue, ds2bqJobID, string(body), form, bqLoadKinds, ef)
102+
dsExportJobID, err := api.StartDS2BQJob(r.Context(), ds2bqJobID, string(body), form, form.NamespaceIDs, bqLoadKinds, ef)
133103
if err != nil {
134104
msg := fmt.Sprintf("failed CreateDatastoreExportJob ds2bqJobID=%v.err=%+v", ds2bqJobID, err)
135105
log.Println(msg)
@@ -153,41 +123,45 @@ func HandleDatastoreExportAPI(w http.ResponseWriter, r *http.Request) {
153123
}
154124
}
155125

156-
func CreateDatastoreExportJob(ctx context.Context, dsexportJobStore *DSExportJobStore, bqloadJobStore *BQLoadJobStore, queue *DatastoreExportJobCheckQueue, ds2bqJobID string, body string, form *DatastoreExportRequest, kinds []string, ef *datastore.EntityFilter) (string, error) {
157-
_, err := dsexportJobStore.Create(ctx, ds2bqJobID, body, kinds)
126+
func (api *DatastoreExportAPI) StartDS2BQJob(ctx context.Context, ds2bqJobID string, body string, form *DatastoreExportRequest, namespaceIDs []string, kinds []string, ef *datastore.EntityFilter) (string, error) {
127+
_, err := api.DSExportJobStore.Create(ctx, ds2bqJobID, body, form.ProjectID, namespaceIDs, kinds)
158128
if err != nil {
159129
return "", fmt.Errorf("failed DSExportJobStore.Create() ds2bqJobID=%v.err=%+v", ds2bqJobID, err)
160130
}
161131

162-
_, err = bqloadJobStore.PutMulti(ctx, BuildBQLoadJobPutMultiForm(ds2bqJobID, kinds, form))
132+
_, err = api.BQLoadJobStore.PutMulti(ctx, BuildBQLoadJobPutMultiForm(ds2bqJobID, kinds, form))
163133
if err != nil {
164134
return "", fmt.Errorf("failed BQLoadJobStore.PutMulti() ds2bqJobID=%v,bqLoadKinds=%+v.err=%+v", ds2bqJobID, kinds, err)
165135
}
166136

167-
ope, err := datastore.Export(ctx, form.ProjectID, form.OutputGCSFilePath, ef)
137+
return api.CreateDatastoreExportJob(ctx, ds2bqJobID, form.ProjectID, form.OutputGCSFilePath, ef)
138+
}
139+
140+
func (api *DatastoreExportAPI) CreateDatastoreExportJob(ctx context.Context, ds2bqJobID string, projectID string, outputGCSFilePath string, ef *datastore.EntityFilter) (string, error) {
141+
ope, err := datastore.Export(ctx, projectID, outputGCSFilePath, ef)
168142
if err != nil {
169-
return "", fmt.Errorf("failed datastore.Export() form=%+v.err=%+v", form, err)
143+
return "", fmt.Errorf("failed datastore.Export() err=%+v", err)
170144
}
171145
switch ope.HTTPStatusCode {
172146
case http.StatusOK:
173147
log.Printf("%+v", ope)
174148

175-
if _, err := dsexportJobStore.StartExportJob(ctx, ds2bqJobID, ope.Name); err != nil {
149+
if _, err := api.DSExportJobStore.StartExportJob(ctx, ds2bqJobID, ope.Name, 0); err != nil {
176150
return "", fmt.Errorf("failed DSExportJobStore.StartExportJob. ds2bqJobID=%v,jobName=%s.err=%+v", ds2bqJobID, ope.Name, err)
177151
}
178152

179-
if err := queue.AddTask(ctx, &DatastoreExportJobCheckRequest{
153+
if err := api.DatastoreExportJobCheckQueue.AddTask(ctx, &DatastoreExportJobCheckRequest{
180154
DS2BQJobID: ds2bqJobID,
181155
DatastoreExportJobID: ope.Name,
182156
}); err != nil {
183157
return "", fmt.Errorf("failed queue.AddTask. jobName=%s.err=%+v", ope.Name, err)
184158
}
185159
return ope.Name, nil
186160
default:
187-
if _, err := dsexportJobStore.FinishExportJob(ctx, ds2bqJobID, DSExportJobStatusFailed, fmt.Sprintf("failed DatastoreExportJob.INSERT(). Code=%v,Message=%v", ope.Error.Code, ope.Error.Message)); err != nil {
161+
if _, err := api.DSExportJobStore.FinishExportJob(ctx, ds2bqJobID, DSExportJobStatusFailed, "", fmt.Sprintf("failed DatastoreExportJob.INSERT(). Code=%v,Message=%v", ope.Error.Code, ope.Error.Message)); err != nil {
188162
return "", fmt.Errorf("failed DSExportJobStore.FinishExportJob. ds2bqJobID=%v.err=%+v", ds2bqJobID, err)
189163
}
190-
return "", fmt.Errorf("failed DatastoreExportJob.INSERT(). form=%+v.ope.Error=%+v", form, ope.Error)
164+
return "", fmt.Errorf("failed DatastoreExportJob.INSERT(). ds2bqJobID=%v,ope.Error=%+v", ds2bqJobID, ope.Error)
191165
}
192166
}
193167

0 commit comments

Comments
 (0)