Skip to content

Commit

Permalink
Merge pull request #61 from github/more-operational-perks
Browse files Browse the repository at this point in the history
suuporting dynamic reconfiguration of max-load
  • Loading branch information
Shlomi Noach authored Jun 9, 2016
2 parents b608d5b + 087d1dd commit 29aead6
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
#
#
RELEASE_VERSION="0.8.7"
RELEASE_VERSION="0.8.8"

buildpath=/tmp/gh-ost
target=gh-ost
Expand Down
2 changes: 2 additions & 0 deletions doc/interactive-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Both interfaces may serve at the same time. Both respond to simple text command,
- `throttle`: force migration suspend
- `no-throttle`: cancel forced suspension (though other throttling reasons may still apply)
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket.

### Examples

Expand Down
13 changes: 11 additions & 2 deletions doc/perks.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,22 @@ You started with a `chunk-size=5000` but you find out it's too much. You want to
$ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
```

Likewise, you can change the `max-load` configuration:

```shell
$ echo "max-load=Threads_running=50,threads_connected=1000" | nc -U /tmp/gh-ost.test.sample_data_0.sock
```

The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`.
In case of parsing error the command is ignored.

Read more about [interactive commands](interactive-commands.md)

### What's the status?

You do not have to have access to the `screen` where the migration is issued. You have two ways to get current status:

0. Use [interactive commands](interactive-commands.md). Via unix socket file or via `TCP` you can get current status:
1. Use [interactive commands](interactive-commands.md). Via unix socket file or via `TCP` you can get current status:

```shell
$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
Expand All @@ -31,7 +40,7 @@ $ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 40s(copy), 41s(total); streamer: mysql-bin.000550:49942; ETA: throttled, flag-file
```

0. `gh-ost` creates and uses a changelog table for internal bookkeeping. This table has the `_osc` suffix (the tool creates and announces this table upon startup) If you like, you can SQL your status:
2. `gh-ost` creates and uses a changelog table for internal bookkeeping. This table has the `_osc` suffix (the tool creates and announces this table upon startup) If you like, you can SQL your status:

```
> select * from _sample_data_0_osc order by id desc limit 1 \G
Expand Down
4 changes: 4 additions & 0 deletions doc/why-triggerless.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ Thus, triggers must keep operating. On busy servers, we have seen that even as t

Read more about [`gh-ost` throttling](throttle.md)

### Triggers, multiple migrations

We are interested in being able to run multiple concurrent migrations (not on the same table, of course). Given all the above, we do not have trust that running multiple trigger-based migrations is a safe operation. In our current, past and shared experiences we have never done so; we are unaware of anyone who is doing so.

### Trigger based migration, no reliable production test

We sometimes wish to experiment with a migration, or know in advance how much time it would take. A trigger-based solution allows us to run a migration on a replica, provided it uses Statement Based Replication.
Expand Down
27 changes: 24 additions & 3 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type MigrationContext struct {
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
ThrottleCommandedByUser int64
MaxLoad map[string]int64
maxLoad map[string]int64
maxLoadMutex *sync.Mutex
PostponeCutOverFlagFile string
SwapTablesTimeoutSeconds int64

Expand Down Expand Up @@ -141,7 +142,8 @@ func newMigrationContext() *MigrationContext {
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1000,
SwapTablesTimeoutSeconds: 3,
MaxLoad: make(map[string]int64),
maxLoad: make(map[string]int64),
maxLoadMutex: &sync.Mutex{},
throttleMutex: &sync.Mutex{},
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
Expand Down Expand Up @@ -269,12 +271,29 @@ func (this *MigrationContext) IsThrottled() (bool, string) {
return this.isThrottled, this.throttleReason
}

func (this *MigrationContext) GetMaxLoad() map[string]int64 {
this.maxLoadMutex.Lock()
defer this.maxLoadMutex.Unlock()

tmpMaxLoadMap := make(map[string]int64)
for k, v := range this.maxLoad {
tmpMaxLoadMap[k] = v
}
return tmpMaxLoadMap
}

// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
// such as: 'Threads_running=100,Threads_connected=500'
// It only applies changes in case there's no parsing error.
func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error {
if maxLoadList == "" {
return nil
}
this.maxLoadMutex.Lock()
defer this.maxLoadMutex.Unlock()

tmpMaxLoadMap := make(map[string]int64)

maxLoadConditions := strings.Split(maxLoadList, ",")
for _, maxLoadCondition := range maxLoadConditions {
maxLoadTokens := strings.Split(maxLoadCondition, "=")
Expand All @@ -287,9 +306,11 @@ func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error {
if n, err := strconv.ParseInt(maxLoadTokens[1], 10, 0); err != nil {
return fmt.Errorf("Error parsing numeric value in max-load condition: %s", maxLoadCondition)
} else {
this.MaxLoad[maxLoadTokens[0]] = n
tmpMaxLoadMap[maxLoadTokens[0]] = n
}
}

this.maxLoad = tmpMaxLoadMap
return nil
}

Expand Down
24 changes: 19 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
}
}

for variableName, threshold := range this.migrationContext.MaxLoad {
maxLoad := this.migrationContext.GetMaxLoad()
for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return true, fmt.Sprintf("%s %s", variableName, err)
Expand Down Expand Up @@ -530,7 +531,9 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
}

func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
tokens := strings.Split(command, "=")
defer writer.Flush()

tokens := strings.SplitN(command, "=", 2)
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
Expand All @@ -553,12 +556,21 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
this.printMigrationStatusHint(writer)
}
}
case "max-load":
{
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printMigrationStatusHint(writer)
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
Expand All @@ -568,9 +580,10 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
}
default:
return fmt.Errorf("Unknown command: %s", command)
err = fmt.Errorf("Unknown command: %s", command)
fmt.Fprintf(writer, "%s\n", err.Error())
return err
}
writer.Flush()
return nil
}

Expand Down Expand Up @@ -644,10 +657,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v",
this.migrationContext.StartTime.Format(time.RubyDate),
))
maxLoad := this.migrationContext.GetMaxLoad()
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v",
atomic.LoadInt64(&this.migrationContext.ChunkSize),
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
this.migrationContext.MaxLoad,
maxLoad,
))
if this.migrationContext.ThrottleFlagFile != "" {
fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v",
Expand Down

0 comments on commit 29aead6

Please sign in to comment.