diff --git a/.gitignore b/.gitignore index 12e40cd..ce04efe 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ snapshotserver/snapshotserver .vscode test-reports/ docker-test-reports/ +bootstrap/cloud_functions/config.json +bootstrap/cloud_functions/node_modules +bootstrap/cloud_functions/test.out diff --git a/bootstrap/README.md b/bootstrap/README.md new file mode 100644 index 0000000..a87ba07 --- /dev/null +++ b/bootstrap/README.md @@ -0,0 +1,24 @@ +# Bootstrap + +The Bootstrap and Bootstrap Google Cloud Functions allow a cluster of Transicator Change Servers to backup and restore +from a Google Cloud Store. This helps avoid a potential issue when scaling up the Change Server cluster where existing +clients may attach to the new Change Server, receive a "snapshot too old" message because the new Change Server +doesn't have all necessary context, and force the clients to request new snapshots. + +The algorithm at this point is simple: If the backup file age > minBackupAge configured on the bootstrap, the backup +offered by the Change Server is accepted and stored. A future enhancement may be to consider a delta of changes made +between backups or similar. + +## Using + +1. Install the Google Cloud Functions as detailed in [cloud_functions/README.md]() +2. Configure Change Server to point to the correct URLs. + +## Testing + +1. As above, install the Google Cloud Functions. +2. If you haven't set the correct REGION and PROJECT_ID env vars in your terminal already, do so now. +3. Run `go test` + +Note: The Tests may not always succeed 100% do to the asynchronous and distributed nature of Google Cloud Functions and +Google Cloud Storage. diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go new file mode 100644 index 0000000..2e0d7ee --- /dev/null +++ b/bootstrap/bootstrap.go @@ -0,0 +1,135 @@ +/* +Copyright 2016 The Transicator Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package bootstrap + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/apigee-labs/transicator/storage" + "io/ioutil" + "net/http" + "os" + "time" +) + +const ( + headerID = "x-bootstrap-id" + headerSecret = "x-bootstrap-secret" + contentType = "application/octet-stream" +) + +type Error struct { + Status int + Message string +} + +func (e Error) Error() string { + return fmt.Sprintf("backup error, status: %d, message: %s", e.Status, e.Message) +} + +var client = http.Client{Timeout: 3 * time.Minute} + +// note: current behavior is to always make the backup and then just allow the send to fail +// if the server doesn't want it. +// note: err returned is nil and uploaded is false if backup was rejected as being too soon +func Backup(db storage.DB, uri, id, secret string) (err error, uploaded bool) { + + // make backup + tempDirURL, err := ioutil.TempDir("", "transicator_sqlite_backup") + if err != nil { + return err, false + } + defer os.RemoveAll(tempDirURL) + backupDirName := fmt.Sprintf("%s/backup", tempDirURL) + + bc := db.Backup(backupDirName) + for { + br := <-bc + log.Debugf("Backup %d remaining\n", br.PagesRemaining) + if err = br.Error; err != nil { + log.Error(err) + return err, false + } + if br.Done { + break + } + } + + backupFileName := fmt.Sprintf("%s/transicator", backupDirName) + fileReader, err := os.Open(backupFileName) + if err != nil { + log.Error(err) + return err, false + } + defer fileReader.Close() + + // send backup + req, err := http.NewRequest("POST", uri, fileReader) + if err != nil { + return err, false + } + req.Header.Set(headerID, id) + req.Header.Set(headerSecret, secret) + req.Header.Set("Content-Type", contentType) + + res, err := client.Do(req) + if err != nil { + return err, false + } + defer res.Body.Close() + + switch res.StatusCode { + case 200: + log.Debug("backup uploaded successfully") + return nil, true + case 429: + log.Debug("backup upload not needed") + return nil, false + default: + msg, _ := ioutil.ReadAll(res.Body) + return Error{ + res.StatusCode, + string(msg), + }, false + } +} + +func Restore(dbDir, uri, id, secret string) error { + + req, err := http.NewRequest("GET", uri, nil) + if err != nil { + return err + } + req.Header.Set(headerID, id) + req.Header.Set(headerSecret, secret) + req.Header.Set("Accept", contentType) + + res, err := client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != 200 { + msg, _ := ioutil.ReadAll(res.Body) + return Error{ + res.StatusCode, + string(msg), + } + } + + return storage.RestoreBackup(res.Body, dbDir) +} diff --git a/bootstrap/bootstrap_suite_test.go b/bootstrap/bootstrap_suite_test.go new file mode 100644 index 0000000..c7ad608 --- /dev/null +++ b/bootstrap/bootstrap_suite_test.go @@ -0,0 +1,36 @@ +package bootstrap_test + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/spf13/viper" + "testing" +) + +func TestBootstrap(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Bootstrap Suite") +} + +var backupURI, restoreURI string + +var _ = BeforeSuite(func() { + log.SetLevel(log.DebugLevel) + + err := viper.BindEnv("region", "REGION") + Expect(err).NotTo(HaveOccurred()) + err = viper.BindEnv("projectID", "PROJECT_ID") + Expect(err).NotTo(HaveOccurred()) + + region := viper.GetString("region") + projectID := viper.GetString("projectID") + + if region == "" || projectID == "" { + Fail("Set env vars: REGION and PROJECT_ID before running test") + } + + backupURI = fmt.Sprintf("https://%s-%s.cloudfunctions.net/bootstrapBackup", region, projectID) + restoreURI = fmt.Sprintf("https://%s-%s.cloudfunctions.net/bootstrapRestore", region, projectID) +}) diff --git a/bootstrap/bootstrap_test.go b/bootstrap/bootstrap_test.go new file mode 100644 index 0000000..b385c52 --- /dev/null +++ b/bootstrap/bootstrap_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2016 The Transicator Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package bootstrap_test + +import ( + "github.com/apigee-labs/transicator/bootstrap" + "github.com/apigee-labs/transicator/storage" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "io/ioutil" + "os" + "time" +) + +const ( + secret = "testing" +) + +var _ = Describe("Bootstrap tests", func() { + + var ( + testDB storage.DB + testDir string + ) + + BeforeEach(func() { + tmpDir, err := ioutil.TempDir("", "bootstrap_test") + Expect(err).NotTo(HaveOccurred()) + + testDB, err = storage.Open(tmpDir) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + testDB.Close() + err := testDB.Delete() + Expect(err).Should(Succeed()) + os.RemoveAll(testDir) + }) + + It("can backup and restore a database", func() { + id := "test1" + + err := testDB.Put("foo", 1, 1, []byte("Hello!")) + Expect(err).NotTo(HaveOccurred()) + + err, uploaded := bootstrap.Backup(testDB, backupURI, id, secret) + Expect(err).NotTo(HaveOccurred()) + Expect(uploaded).To(BeTrue()) + + restoreDir, err := ioutil.TempDir(testDir, "bootstrap_test") + Expect(err).NotTo(HaveOccurred()) + + // this is to help (although it can't guarantee) that GCS synchronizes between backup calls + time.Sleep(5 * time.Second) + + err = bootstrap.Restore(restoreDir, restoreURI, id, secret) + Expect(err).NotTo(HaveOccurred()) + + db, err := storage.Open(restoreDir) + Expect(err).Should(Succeed()) + + ret, err := db.Get("foo", 1, 1) + Expect(err).Should(Succeed()) + Expect(ret).Should(Equal([]byte("Hello!"))) + db.Close() + }, 6) + + Context("backup", func() { + + It("should not replace a recent backup", func() { + id := "test2" + + err, uploaded := bootstrap.Backup(testDB, backupURI, id, secret) + Expect(err).NotTo(HaveOccurred()) + Expect(uploaded).To(BeTrue()) + + // this is to help (although it can't guarantee) that GCS synchronizes between backup calls + time.Sleep(5 * time.Second) + + err, uploaded = bootstrap.Backup(testDB, backupURI, id, secret) + Expect(err).NotTo(HaveOccurred()) + Expect(uploaded).To(BeFalse()) + }, 6) + + It("should fail without id", func() { + err, uploaded := bootstrap.Backup(testDB, backupURI, "", secret) + Expect(err).To(HaveOccurred()) + Expect(uploaded).To(BeFalse()) + }) + + It("should fail without secret", func() { + err, uploaded := bootstrap.Backup(testDB, backupURI, "xxx", "") + Expect(err).To(HaveOccurred()) + Expect(uploaded).To(BeFalse()) + }) + }) + + Context("restore", func() { + + It("should fail without id", func() { + err := bootstrap.Restore(testDir, backupURI, "", secret) + Expect(err).To(HaveOccurred()) + }) + + It("should fail without secret", func() { + err := bootstrap.Restore(testDir, backupURI, "xxx", "") + Expect(err).To(HaveOccurred()) + }) + + It("should fail with 404 if backup is not present", func() { + err := bootstrap.Restore(testDir, restoreURI, "not a real backup", secret) + Expect(err).To(HaveOccurred()) + e, ok := err.(bootstrap.Error) + Expect(ok).To(BeTrue()) + Expect(e.Status).To(Equal(404)) + }) + + It("should fail with 403 if wrong secret", func() { + id := "test3" + + err, uploaded := bootstrap.Backup(testDB, backupURI, id, secret) + Expect(err).NotTo(HaveOccurred()) + Expect(uploaded).To(BeTrue()) + + // this is to help (although it can't guarantee) that GCS synchronizes between backup calls + time.Sleep(5 * time.Second) + + err = bootstrap.Restore(testDir, restoreURI, id, "not the real secret") + Expect(err).To(HaveOccurred()) + e, ok := err.(bootstrap.Error) + Expect(ok).To(BeTrue()) + Expect(e.Status).To(Equal(403)) + }, 6) + + }) +}) diff --git a/bootstrap/cloud_functions/README.md b/bootstrap/cloud_functions/README.md new file mode 100644 index 0000000..704b2eb --- /dev/null +++ b/bootstrap/cloud_functions/README.md @@ -0,0 +1,77 @@ +# Bootstrap + +To set up the Google Cloud Functions, follow the instructions below. + +## Config + + cp config.json_example config.json + +Edit config.json values to your liking. The values are: + +* `bucketName`: The GCS bucket that backups will be stored in +* `minBackupAge`: The minimum age in milliseconds of a backup before replacing the new backup + +## Install dependencies + + npm install + +## Set env (change these values) + + export PROJECT_ID=edgex-149918 + export CLASS=regional + export LOCATION=us-central1 + export REGION=us-central1 + export SCRIPTS_BUCKET=gs://transicator-bootstrap-scripts + export DATA_BUCKET=gs://transicator-bootstrap-data + +Notes: +* `PROJECT_ID` is your Google Cloud Platform Project ID +* `SCRIPTS_BUCKET` and `DATA_BUCKET` must be globally unique bucket identifiers (although they could be the same bucket) +* The `bucketName` config value in your `config.json` and `DATA_BUCKET` value must match +* These env vars are not generally necessary, they're just to configure the following commands + +## Deploy + +1. Create the Google Cloud Storage buckets: + + + gsutil mb -c $CLASS -l $LOCATION -p $PROJECT_ID $SCRIPTS_BUCKET + gsutil mb -c $CLASS -l $LOCATION -p $PROJECT_ID $DATA_BUCKET + + +2. Deploy the Google Cloud Functions: + + + gcloud --project $PROJECT_ID beta functions deploy bootstrapBackup --stage-bucket $SCRIPTS_BUCKET --trigger-http --timeout 180s + gcloud --project $PROJECT_ID beta functions deploy bootstrapRestore --stage-bucket $SCRIPTS_BUCKET --trigger-http --timeout 180s + +Note: Current impl requires streaming data through the function, so timeout may need to be tweaked to ensure success. + +## Test + +### Store a file (README.md) + + curl --data-binary "@README.md" -H "Content-Type: application/octet-stream" \ + -H "x-bootstrap-id: test" -H "x-bootstrap-secret: my-secret" \ + "https://$REGION-$PROJECT_ID.cloudfunctions.net/bootstrapBackup" + +Notes: + * `x-bootstrap-id` holds the identifier of the file you're storing + * `x-bootstrap-secret` holds the secret you'll pass to retrieve the file + +Both values are required to retrieve. See Download below. + +### Retrieve the file (as test.out) + + curl -o "test.out" -H "Accept: application/octet-stream" \ + -H "x-bootstrap-id: test" -H "x-bootstrap-secret: my-secret" \ + "https://$REGION-$PROJECT_ID.cloudfunctions.net/bootstrapRestore" + +The files should be the same. If so, this output will be empty: + + diff README.md test.out + + + curl -o "test.out" -H "Accept: application/octet-stream" \ + -H "x-bootstrap-id: test" -H "x-bootstrap-secret: test" \ + "https://$REGION-$PROJECT_ID.cloudfunctions.net/bootstrapRestore" \ No newline at end of file diff --git a/bootstrap/cloud_functions/boostrap_api.yaml b/bootstrap/cloud_functions/boostrap_api.yaml new file mode 100644 index 0000000..b290e65 --- /dev/null +++ b/bootstrap/cloud_functions/boostrap_api.yaml @@ -0,0 +1,69 @@ +swagger: "2.0" +info: + version: "0.0.1" + title: Bootstrap backup and restore +host: region-project.cloudfunctions.net +basePath: / +schemes: + - http + - https +consumes: + - application/json +produces: + - application/json +paths: + /bootstrapBackup: + post: + description: Perform backup + consumes: + - application/octet-stream + parameters: + - name: x-bootstrap-id + type: string + in: header + required: true + - name: x-bootstrap-secret + type: string + in: header + required: true + responses: + "200": + description: Success + schema: + $ref: "#/definitions/BackupResponse" + "429": + description: Backup not needed + default: + description: Error + schema: + $ref: "#/definitions/ErrorResponse" + /bootstrapRestore: + get: + description: Perform restore + produces: + - application/octet-stream + parameters: + - name: x-bootstrap-id + type: string + in: header + required: true + - name: x-bootstrap-secret + type: string + in: header + required: true + responses: + "200": + description: Success + schema: + $ref: "#/definitions/RestoreResponse" + default: + description: Error + schema: + $ref: "#/definitions/ErrorResponse" +definitions: + BackupResponse: + type: object + RestoreResponse: + type: object + ErrorResponse: + type: object diff --git a/bootstrap/cloud_functions/config.json_example b/bootstrap/cloud_functions/config.json_example new file mode 100644 index 0000000..2b1ea84 --- /dev/null +++ b/bootstrap/cloud_functions/config.json_example @@ -0,0 +1,4 @@ +{ + "bucketName": "transicator-bootstrap-data", + "minBackupAge": 3600000 +} diff --git a/bootstrap/cloud_functions/index.js b/bootstrap/cloud_functions/index.js new file mode 100644 index 0000000..dfbb417 --- /dev/null +++ b/bootstrap/cloud_functions/index.js @@ -0,0 +1,177 @@ +const gcs = require('@google-cloud/storage')() +const contentType = 'application/octet-stream' +const backupIDHeader = 'x-bootstrap-id' +const secretHeader = 'x-bootstrap-secret' + +// default values... +var bucketName = 'transicator-bootstrap-data' +var minBackupAge = 60*60*1000 // 60 minutes + +try { + let config = require("./config.json") + console.log("config.json found, using contained values") + if (config.bucketName) bucketName = config.bucketName + if (config.minBackupAge) minBackupAge = config.minBackupAge +} catch (err) { + console.log("no config.json found, using defaults") +} + +exports.bootstrapBackup = (req, res) => { + + if (req.method !== 'POST') { + return res.status(405).send('method must be POST') + } + + if (req.get('content-type') !== contentType) { + return res.status(406).send('content-type must be ' + contentType) + } + + let id = req.get(backupIDHeader) + if (!id) { + return res.status(400).send('id is required') + } + + let secret = req.get(secretHeader) + if (!secret) { + return res.status(400).send('secret is required') + } + + let bucket = gcs.bucket(bucketName) + + bucket.exists((err, exists) => { + if (err) { + let s = err.toString() + " (1)" + console.error(s) + return res.status(500).send(s) + } + + if (!exists) { + return res.status(500).send('bucket ' + bucketName + " doesn't exist") + } + + let file = bucket.file(id) + + // check if file exists, and the file age, reject if younger than minBackupAge + file.exists((err, exists) => { + if (err) { + let s = err.toString() + " (2)" + console.error(s) + return res.status(500).send(s) + } + + if (exists) { + + let backupCreated = Date.parse(file.metadata.timeCreated) + let backupAge = Date.now() - backupCreated + + if (backupAge < minBackupAge) { + return res.status(429).send('no need for backup right now') + } + } + + let writeOpts = { + metadata: { + contentType: contentType, + metadata: { + secret: secret, + }, + }, + } + + let writer = file.createWriteStream(writeOpts) + writer.write(req.body) + writer.end() + + res.send(`Thank you, ${id}.`) + }) + }) +} + +exports.bootstrapRestore = (req, res) => { + + if (req.method !== 'GET') { + return res.status(405).send('method must be GET') + } + + if (req.get('accept') !== contentType) { + return res.status(400).send('accept must be ' + contentType) + } + + let id = req.get(backupIDHeader) + if (!id) { + return res.status(400).send('id is required') + } + + let secret = req.get(secretHeader) + if (!secret) { + return res.status(401).send('unauthorized') + } + + let bucket = gcs.bucket(bucketName) + + bucket.exists((err, exists) => { + if (err) { + let s = err.toString() + " (1)" + console.error(s) + return res.status(500).send(s) + } + + if (!exists) { + return res.status(500).send('bucket ' + bucketName + " doesn't exist") + } + + let file = bucket.file(id) + file.exists((err, exists) => { + if (err) { + let s = err.toString() + " (2)" + console.error(s) + return res.status(500).send(s) + } + + if (!exists) { + return res.status(404).send('file ' + id + " doesn't exist") + } + + file.getMetadata((err, metadata) => { + if (err) { + let s = err.toString() + " (3)" + console.error(s) + return res.status(500).send(s) + } + + if (metadata.metadata.secret !== secret) { + return res.status(403).send("not allowed") + } + + res.setHeader('Content-Type', contentType) + + file + .createReadStream() + .on('error', function(err) { + let s = err.toString() + " (4)" + console.error(s) + return res.status(500).send(s) + }) + .pipe(res) + }) + }) + }) +} + +// sadly, signed urls don't currently work (see: https://github.com/GoogleCloudPlatform/gcloud-common/issues/180) +// so for now, we can't include this code to avoid streaming the backup data through the script +// let config = { +// action: 'read', +// expires: Date.now() + (15*60*1000) // 15 minutes +// } +// file.getSignedUrl(config, (err, url) => { +// if (err) { +// return res.status(500).send(err.toString()) +// } +// +// res.writeHead(302, { +// 'Location': url +// }) +// res.end() +// }) + diff --git a/bootstrap/cloud_functions/package.json b/bootstrap/cloud_functions/package.json new file mode 100644 index 0000000..685de50 --- /dev/null +++ b/bootstrap/cloud_functions/package.json @@ -0,0 +1,8 @@ +{ + "name": "bootstrap", + "private": true, + "main": "index.js", + "dependencies": { + "@google-cloud/storage": "^1.1.0" + } +} diff --git a/changeserver/backup.go b/changeserver/backup.go new file mode 100644 index 0000000..6fc0c5e --- /dev/null +++ b/changeserver/backup.go @@ -0,0 +1,42 @@ +/* +Copyright 2016 The Transicator Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + log "github.com/Sirupsen/logrus" + "github.com/apigee-labs/transicator/bootstrap" + "time" +) + +func (s *server) runBackup() { + + ticker := time.NewTicker(s.backupInterval) + + for { + select { + case <-ticker.C: + err, _ := bootstrap.Backup(s.db, s.backupURI, s.backupID, s.backupSecret) + if err != nil { + log.Errorf("Backup failed: %v", err) + } + + case stopped := <-s.stopChan: + ticker.Stop() + stopped <- true + return + } + } +} diff --git a/changeserver/config.go b/changeserver/config.go index bf43db7..a7f077d 100644 --- a/changeserver/config.go +++ b/changeserver/config.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" + "time" ) func setConfigDefaults() { @@ -52,6 +53,17 @@ func setConfigDefaults() { pflag.BoolP("config", "C", false, fmt.Sprintf("Use a config file named '%s' located in either /etc/%s/, ~/.%s or ./)", appName, packageName, packageName)) pflag.BoolP("debug", "D", false, "Turn on debugging") viper.SetDefault("debug", false) + + pflag.String("boostrapBackupURI", "", "Bootstrap backup URI") + viper.SetDefault("boostrapBackupURI", "") + pflag.String("boostrapRestoreURI", "", "Bootstrap restore URI") + viper.SetDefault("boostrapRestoreURI", "") + pflag.String("boostrapID", "", "Bootstrap backup and restore ID (unique per cluster)") + viper.SetDefault("boostrapID", "") + pflag.String("boostrapSecret", "", "Bootstrap backup and restore secret") + viper.SetDefault("boostrapSecret", "") + pflag.Duration("boostrapBackupInterval", time.Hour, "Bootstrap backup interval") + viper.SetDefault("boostrapBackupInterval", time.Hour) } func getConfig() error { @@ -72,6 +84,11 @@ func getConfig() error { viper.BindPFlag("debug", pflag.Lookup("debug")) viper.BindPFlag("selectorColumn", pflag.Lookup("selectorcolumn")) + viper.BindPFlag("boostrapBackupURI", pflag.Lookup("boostrapBackupURI")) + viper.BindPFlag("boostrapRestoreURI", pflag.Lookup("boostrapRestoreURI")) + viper.BindPFlag("boostrapID", pflag.Lookup("boostrapID")) + viper.BindPFlag("boostrapSecret", pflag.Lookup("boostrapSecret")) + // Load config values from file if viper.GetBool("configFile") { viper.SetConfigName(appName) // name of config file (without extension) diff --git a/changeserver/main.go b/changeserver/main.go index 22e4495..4433041 100644 --- a/changeserver/main.go +++ b/changeserver/main.go @@ -24,8 +24,10 @@ import ( "github.com/30x/goscaffold" "github.com/Sirupsen/logrus" + "github.com/apigee-labs/transicator/bootstrap" "github.com/spf13/pflag" "github.com/spf13/viper" + "net/url" ) const ( @@ -86,6 +88,12 @@ func runMain() int { prefix := viper.GetString("prefix") selectorColumnParam := viper.GetString("selectorColumn") + bootstrapRestoreURI := viper.GetString("boostrapRestoreURI") + bootstrapBackupURI := viper.GetString("boostrapBackupURI") + bootstrapID := viper.GetString("boostrapID") + bootstrapSecret := viper.GetString("boostrapSecret") + backupInterval := viper.GetDuration("backupInterval") + debug := viper.GetBool("debug") if dbDir == "" || pgURL == "" || pgSlot == "" { @@ -108,6 +116,27 @@ func runMain() int { } } + if bootstrapRestoreURI != "" { + _, err := url.Parse(bootstrapRestoreURI) + fmt.Fprintf(os.Stderr, "Invalid value for boostrapRestoreURI: \"%s\": %s\n", + bootstrapRestoreURI, err) + printUsage() + return 4 + } + if bootstrapBackupURI != "" { + _, err := url.Parse(bootstrapBackupURI) + fmt.Fprintf(os.Stderr, "Invalid value for bootstrapBackupURIParam: \"%s\": %s\n", + bootstrapBackupURI, err) + printUsage() + return 4 + } + if bootstrapBackupURI != "" || bootstrapBackupURI != "" && (bootstrapID == "" || bootstrapSecret == "") { + fmt.Fprintln(os.Stderr, + "Error: bootstrapID and bootstrapSecret are required with boostrapRestoreURI or bootstrapBackupURIParam") + printUsage() + return 4 + } + // Set the global scopeField from server.go to the user supplied value selectorColumn = selectorColumnParam @@ -115,9 +144,18 @@ func runMain() int { logrus.SetLevel(logrus.DebugLevel) } + if bootstrapBackupURI != "" { + err := bootstrap.Restore(bootstrapBackupURI, bootstrapID, bootstrapSecret, dbDir) + if err != nil { + fmt.Fprintf(os.Stderr, "Error retrieving bootstrap: %s\n", err) + return 4 + } + } + mux := http.NewServeMux() - server, err := createChangeServer(mux, dbDir, pgURL, pgSlot, prefix) + server, err := createChangeServer(mux, dbDir, pgURL, pgSlot, prefix, + bootstrapBackupURI, bootstrapID, bootstrapSecret, backupInterval) if err != nil { fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err) return 4 diff --git a/changeserver/server.go b/changeserver/server.go index e349ba2..c95f808 100644 --- a/changeserver/server.go +++ b/changeserver/server.go @@ -28,6 +28,7 @@ import ( "github.com/apigee-labs/transicator/replication" "github.com/apigee-labs/transicator/storage" "github.com/julienschmidt/httprouter" + "time" ) const ( @@ -45,21 +46,26 @@ const ( var selectorColumn = defaultSelectorColumn type server struct { - db storage.DB - repl *replication.Replicator - tracker *changeTracker - cleaner *cleaner - firstChange common.Sequence - slotName string - dropSlot int32 - stopChan chan chan<- bool + db storage.DB + repl *replication.Replicator + tracker *changeTracker + cleaner *cleaner + firstChange common.Sequence + slotName string + dropSlot int32 + stopChan chan chan<- bool + backupURI string + backupID string + backupSecret string + backupInterval time.Duration } type errMsg struct { Error string `json:"error"` } -func createChangeServer(mux *http.ServeMux, dbDir, pgURL, pgSlot, urlPrefix string) (*server, error) { +func createChangeServer(mux *http.ServeMux, dbDir, pgURL, pgSlot, urlPrefix, + backupURI, backupID, backupSecret string, backupInterval time.Duration) (*server, error) { success := false slotName := sanitizeSlotName(pgSlot) @@ -88,12 +94,16 @@ func createChangeServer(mux *http.ServeMux, dbDir, pgURL, pgSlot, urlPrefix stri success = true s := &server{ - db: db, - repl: repl, - slotName: slotName, - firstChange: firstChange, - tracker: createTracker(), - stopChan: make(chan chan<- bool, 1), + db: db, + repl: repl, + slotName: slotName, + firstChange: firstChange, + tracker: createTracker(), + stopChan: make(chan chan<- bool, 1), + backupURI: backupURI, + backupID: backupID, + backupSecret: backupSecret, + backupInterval: backupInterval, } router := httprouter.New() @@ -107,6 +117,9 @@ func createChangeServer(mux *http.ServeMux, dbDir, pgURL, pgSlot, urlPrefix stri func (s *server) start() { s.repl.Start() go s.runReplication(s.firstChange) + if s.backupURI != "" && s.backupID != "" && s.backupSecret != "" && s.backupInterval > 0 { + go s.runBackup() + } } func (s *server) stop() { diff --git a/changeserver/server_main_test.go b/changeserver/server_main_test.go index fd0a619..f66c727 100644 --- a/changeserver/server_main_test.go +++ b/changeserver/server_main_test.go @@ -32,6 +32,7 @@ import ( . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" . "github.com/onsi/gomega" + "time" ) const ( @@ -84,7 +85,8 @@ var _ = BeforeSuite(func() { // Start the server, which will be ready to respond to API calls // and which will also start replication with the database mux := http.NewServeMux() - testServer, err = createChangeServer(mux, testDataDir, dbURL, replicationSlot, "") + testServer, err = createChangeServer(mux, testDataDir, dbURL, replicationSlot, "", + "", "", "", time.Hour) Expect(err).Should(Succeed()) // For the tests, filter out changes for tables not part of these unit diff --git a/storage/storage_sqlite.go b/storage/storage_sqlite.go index dc09c85..3e379dd 100644 --- a/storage/storage_sqlite.go +++ b/storage/storage_sqlite.go @@ -122,6 +122,33 @@ func Open(baseFile string) (*SQL, error) { return stor, nil } +/* +RestoreBackup deletes any existing data at the baseFile location and writes the contents of the +Reader, which is expected to be a Sqlite DB, to the persistent db file at the expected location. +It's the caller's responsibility to close the Reader if needed and verify the DB is correct. +Generally, a call to this would be followed by Open(baseFile). +*/ +func RestoreBackup(r io.Reader, baseFile string) (err error) { + err = os.RemoveAll(baseFile) + if err != nil { + return err + } + + url, err := createDBDir(baseFile) + if err != nil { + return err + } + + f, err := os.Create(url) + defer f.Close() + _, err = io.Copy(f, r) + if err != nil { + return err + } + + return nil +} + // createDBDir ensures that "base" is a directory and returns the file name func createDBDir(baseFile string) (string, error) { st, err := os.Stat(baseFile)