Skip to content

Commit 389880b

Browse files
committed
Implement error callback in concurent queries
- NodeError type has been added to destinguish errors from particular nodes. - Maps are now used for name to executor mapping. This gives a small performance penalty on multi query functions. However, type adaptor functions have been removed, eliminating the overhead. This makes the gorss performance loss negligible. - Due to using Maps, the Node type has become obsolete and has been removed. Instead, sql.DB is now used more widely. Closes #17
1 parent b920f41 commit 389880b

File tree

10 files changed

+876
-555
lines changed

10 files changed

+876
-555
lines changed

multiTx.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,35 @@ type txBeginner interface {
1414
BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error)
1515
}
1616

17-
func beginMultiTx(ctx context.Context, opts *sql.TxOptions, txb ...txBeginner) ([]*sql.Tx, error) {
17+
func beginMultiTx(ctx context.Context, opts *sql.TxOptions, txb map[string]txBeginner, errCallback ErrCallbackFunc) (map[string]executor, error) {
1818
type result struct {
19-
tx *sql.Tx
20-
err error
19+
name string
20+
tx *sql.Tx
21+
err error
2122
}
2223

2324
rc := make(chan result, len(txb))
2425

25-
for _, n := range txb {
26-
go func(n txBeginner) {
27-
var r result
28-
r.tx, r.err = n.BeginTx(ctx, opts)
26+
for name, b := range txb {
27+
go func(name string, b txBeginner) {
28+
r := result{name: name}
29+
30+
r.tx, r.err = b.BeginTx(ctx, opts)
31+
if r.err != nil {
32+
r.err = &NodeError{name, r.err}
33+
34+
if errCallback != nil {
35+
errCallback(r.err)
36+
}
37+
}
38+
2939
rc <- r
30-
}(n)
40+
}(name, b)
3141
}
3242

3343
var errs []error
3444

35-
txs := make([]*sql.Tx, 0, len(txb))
45+
txs := make(map[string]executor, len(txb))
3646

3747
for i := 0; i < len(txb); i++ {
3848
r := <-rc
@@ -42,8 +52,7 @@ func beginMultiTx(ctx context.Context, opts *sql.TxOptions, txb ...txBeginner) (
4252
continue
4353
}
4454

45-
txs = append(txs, r.tx)
46-
55+
txs[r.name] = r.tx
4756
}
4857

4958
if errs != nil {
@@ -60,9 +69,10 @@ func beginMultiTx(ctx context.Context, opts *sql.TxOptions, txb ...txBeginner) (
6069
// MultiTx holds a slice of open transactions to multiple nodes.
6170
// All methods on this type run their sql.Tx variant in one Go routine per Node.
6271
type MultiTx struct {
63-
tx []*sql.Tx
64-
wg sync.WaitGroup
65-
cancel context.CancelFunc
72+
tx map[string]executor
73+
wg sync.WaitGroup
74+
cancel context.CancelFunc
75+
errCallback ErrCallbackFunc
6676
}
6777

6878
// cancelWait cancels a previously running operation on TX
@@ -91,11 +101,20 @@ func (m *MultiTx) cancelWait() {
91101
func (m *MultiTx) Rollback() error {
92102
m.cancelWait()
93103
ec := make(chan error, len(m.tx))
94-
for _, tx := range m.tx {
95-
go func(tx *sql.Tx) {
104+
105+
for name, tx := range m.tx {
106+
go func(name string, tx *sql.Tx) {
96107
err := tx.Rollback()
108+
if err != nil {
109+
err = &NodeError{name, err}
110+
}
111+
112+
if m.errCallback != nil {
113+
m.errCallback(err)
114+
}
115+
97116
ec <- err
98-
}(tx)
117+
}(name, tx.(*sql.Tx))
99118
}
100119

101120
var errs []error
@@ -125,10 +144,20 @@ func (m *MultiTx) Rollback() error {
125144
// This method is primarily included to implement boil.Transactor
126145
func (m *MultiTx) Commit() error {
127146
ec := make(chan error, len(m.tx))
128-
for _, tx := range m.tx {
129-
go func(tx *sql.Tx) {
130-
ec <- tx.Commit()
131-
}(tx)
147+
148+
for name, tx := range m.tx {
149+
go func(name string, tx *sql.Tx) {
150+
err := tx.Commit()
151+
if err != nil {
152+
err = &NodeError{name, err}
153+
}
154+
155+
if m.errCallback != nil {
156+
m.errCallback(err)
157+
}
158+
159+
ec <- err
160+
}(name, tx.(*sql.Tx))
132161
}
133162

134163
var errs []error
@@ -159,7 +188,7 @@ func (m *MultiTx) context(ctx context.Context) context.Context {
159188
// It does not make much sense to run this method against multiple Nodes, as they are ussualy slaves.
160189
// This method is primarily included to implement boil.ContextExecutor.
161190
func (m *MultiTx) ExecContext(ctx context.Context, query string, args ...interface{}) (res sql.Result, err error) {
162-
return multiExec(m.context(ctx), &m.wg, mtx2Exec(m.tx), query, args...)
191+
return multiExec(m.context(ctx), &m.wg, m.tx, m.errCallback, query, args...)
163192
}
164193

165194
// Exec runs ExecContext with context.Background().
@@ -178,7 +207,7 @@ func (m *MultiTx) Exec(query string, args ...interface{}) (sql.Result, error) {
178207
//
179208
// Implements boil.ContextExecutor.
180209
func (m *MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error) {
181-
return multiQuery(m.context(ctx), &m.wg, mtx2Exec(m.tx), query, args...)
210+
return multiQuery(m.context(ctx), &m.wg, m.tx, m.errCallback, query, args...)
182211
}
183212

184213
// Query runs QueryContext with context.Background().
@@ -195,7 +224,7 @@ func (m *MultiTx) Query(query string, args ...interface{}) (*sql.Rows, error) {
195224
// If you have a choice, stick with a regular QueryContext.
196225
// This method is primarily included to implement boil.Executor.
197226
func (m *MultiTx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
198-
return multiQueryRow(m.context(ctx), &m.wg, mtx2Exec(m.tx), query, args...)
227+
return multiQueryRow(m.context(ctx), &m.wg, m.tx, m.errCallback, query, args...)
199228
}
200229

201230
// QueryRow wrapper around sql.DB.QueryRow.

0 commit comments

Comments
 (0)