Skip to content

Commit 5237e11

Browse files
authored
feat: (observability) Add support for OpenTelemetry traces and allow observability options to be passed. (googleapis#2131)
This change plumbs ObservabilityConfig into Spanner, Instance and Database so that any subsequent traces will use it when beginning spans and later on for metrics. Updates googleapis#2079
1 parent 2fd63ac commit 5237e11

File tree

12 files changed

+208
-43
lines changed

12 files changed

+208
-43
lines changed

observability-test/batch-transaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ describe('BatchTransaction', () => {
139139
batchTransaction = new BatchTransaction(SESSION as {} as Session);
140140
batchTransaction.session = SESSION as {} as Session;
141141
batchTransaction.id = ID;
142-
batchTransaction.observabilityOptions = {tracerProvider: provider};
142+
batchTransaction._observabilityOptions = {tracerProvider: provider};
143143
REQUEST.callsFake((_, callback) => callback(null, RESPONSE));
144144
});
145145

observability-test/database.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ describe('Database', () => {
241241
database = new Database(INSTANCE, NAME, POOL_OPTIONS);
242242
database.parent = INSTANCE;
243243
database.databaseRole = 'parent_role';
244-
database.observabilityConfig = {
244+
database._observabilityOptions = {
245245
tracerProvider: provider,
246246
enableExtendedTracing: false,
247247
};

observability-test/spanner.ts

Lines changed: 157 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import * as assert from 'assert';
1818
import {grpc} from 'google-gax';
1919
import {google} from '../protos/protos';
20-
import {Database, Spanner} from '../src';
20+
import {Database, Instance, Spanner} from '../src';
2121
import {MutationSet} from '../src/transaction';
2222
import protobuf = google.spanner.v1;
2323
import * as mock from '../test/mockserver/mockspanner';
@@ -35,6 +35,8 @@ const {
3535
AsyncHooksContextManager,
3636
} = require('@opentelemetry/context-async-hooks');
3737

38+
const {ObservabilityOptions} = require('../src/instrument');
39+
3840
/** A simple result set for SELECT 1. */
3941
function createSelect1ResultSet(): protobuf.ResultSet {
4042
const fields = [
@@ -60,7 +62,9 @@ interface setupResults {
6062
spannerMock: mock.MockSpanner;
6163
}
6264

63-
async function setup(): Promise<setupResults> {
65+
async function setup(
66+
observabilityOptions?: typeof ObservabilityOptions
67+
): Promise<setupResults> {
6468
const server = new grpc.Server();
6569

6670
const spannerMock = mock.createMockSpanner(server);
@@ -97,6 +101,7 @@ async function setup(): Promise<setupResults> {
97101
servicePath: 'localhost',
98102
port,
99103
sslCreds: grpc.credentials.createInsecure(),
104+
observabilityOptions: observabilityOptions,
100105
});
101106

102107
return Promise.resolve({
@@ -122,7 +127,16 @@ describe('EndToEnd', () => {
122127
});
123128

124129
beforeEach(async () => {
125-
const setupResult = await setup();
130+
traceExporter = new InMemorySpanExporter();
131+
const sampler = new AlwaysOnSampler();
132+
const provider = new NodeTracerProvider({
133+
sampler: sampler,
134+
exporter: traceExporter,
135+
});
136+
const setupResult = await setup({
137+
tracerProvider: provider,
138+
enableExtendedTracing: false,
139+
});
126140
spanner = setupResult.spanner;
127141
server = setupResult.server;
128142
spannerMock = setupResult.spannerMock;
@@ -138,21 +152,10 @@ describe('EndToEnd', () => {
138152
mock.StatementResult.updateCount(1)
139153
);
140154

141-
traceExporter = new InMemorySpanExporter();
142-
const sampler = new AlwaysOnSampler();
143-
144-
const provider = new NodeTracerProvider({
145-
sampler: sampler,
146-
exporter: traceExporter,
147-
});
148155
provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter));
149156

150157
const instance = spanner.instance('instance');
151158
database = instance.database('database');
152-
database.observabilityConfig = {
153-
tracerProvider: provider,
154-
enableExtendedTracing: false,
155-
};
156159
});
157160

158161
afterEach(() => {
@@ -440,3 +443,143 @@ describe('EndToEnd', () => {
440443
});
441444
});
442445
});
446+
447+
describe('ObservabilityOptions injection and propagation', async () => {
448+
const globalTraceExporter = new InMemorySpanExporter();
449+
const globalTracerProvider = new NodeTracerProvider({
450+
sampler: new AlwaysOnSampler(),
451+
exporter: globalTraceExporter,
452+
});
453+
globalTracerProvider.addSpanProcessor(
454+
new SimpleSpanProcessor(globalTraceExporter)
455+
);
456+
globalTracerProvider.register();
457+
458+
const injectedTraceExporter = new InMemorySpanExporter();
459+
const injectedTracerProvider = new NodeTracerProvider({
460+
sampler: new AlwaysOnSampler(),
461+
exporter: injectedTraceExporter,
462+
});
463+
injectedTracerProvider.addSpanProcessor(
464+
new SimpleSpanProcessor(injectedTraceExporter)
465+
);
466+
467+
const observabilityOptions: typeof ObservabilityOptions = {
468+
tracerProvider: injectedTracerProvider,
469+
enableExtendedTracing: true,
470+
};
471+
472+
const setupResult = await setup(observabilityOptions);
473+
const spanner = setupResult.spanner;
474+
const server = setupResult.server;
475+
const spannerMock = setupResult.spannerMock;
476+
477+
after(async () => {
478+
globalTraceExporter.reset();
479+
injectedTraceExporter.reset();
480+
await globalTracerProvider.shutdown();
481+
await injectedTracerProvider.shutdown();
482+
spannerMock.resetRequests();
483+
spanner.close();
484+
server.tryShutdown(() => {});
485+
});
486+
487+
it('Passed into Spanner, Instance and Database', done => {
488+
// Ensure that the same observability configuration is set on the Spanner client.
489+
assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions);
490+
491+
// Acquire a handle to the Instance through spanner.instance.
492+
const instanceByHandle = spanner.instance('instance');
493+
assert.deepStrictEqual(
494+
instanceByHandle._observabilityOptions,
495+
observabilityOptions
496+
);
497+
498+
// Create the Instance by means of a constructor directly.
499+
const instanceByConstructor = new Instance(spanner, 'myInstance');
500+
assert.deepStrictEqual(
501+
instanceByConstructor._observabilityOptions,
502+
observabilityOptions
503+
);
504+
505+
// Acquire a handle to the Database through instance.database.
506+
const databaseByHandle = instanceByHandle.database('database');
507+
assert.deepStrictEqual(
508+
databaseByHandle._observabilityOptions,
509+
observabilityOptions
510+
);
511+
512+
// Create the Database by means of a constructor directly.
513+
const databaseByConstructor = new Database(
514+
instanceByConstructor,
515+
'myDatabase'
516+
);
517+
assert.deepStrictEqual(
518+
databaseByConstructor._observabilityOptions,
519+
observabilityOptions
520+
);
521+
522+
done();
523+
});
524+
525+
it('Propagates spans to the injected not global TracerProvider', done => {
526+
const instance = spanner.instance('instance');
527+
const database = instance.database('database');
528+
529+
database.run('SELECT 1', (err, rows) => {
530+
assert.ifError(err);
531+
532+
injectedTraceExporter.forceFlush();
533+
globalTraceExporter.forceFlush();
534+
const spansFromInjected = injectedTraceExporter.getFinishedSpans();
535+
const spansFromGlobal = globalTraceExporter.getFinishedSpans();
536+
537+
assert.strictEqual(
538+
spansFromGlobal.length,
539+
0,
540+
'Expecting no spans from the global exporter'
541+
);
542+
assert.strictEqual(
543+
spansFromInjected.length > 0,
544+
true,
545+
'Expecting spans from the injected exporter'
546+
);
547+
548+
spansFromInjected.sort((spanA, spanB) => {
549+
spanA.startTime < spanB.startTime;
550+
});
551+
const actualSpanNames: string[] = [];
552+
const actualEventNames: string[] = [];
553+
spansFromInjected.forEach(span => {
554+
actualSpanNames.push(span.name);
555+
span.events.forEach(event => {
556+
actualEventNames.push(event.name);
557+
});
558+
});
559+
560+
const expectedSpanNames = [
561+
'CloudSpanner.Database.runStream',
562+
'CloudSpanner.Database.run',
563+
];
564+
assert.deepStrictEqual(
565+
actualSpanNames,
566+
expectedSpanNames,
567+
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
568+
);
569+
570+
const expectedEventNames = [
571+
'Acquiring session',
572+
'Waiting for a session to become available',
573+
'Acquired session',
574+
'Using Session',
575+
];
576+
assert.deepStrictEqual(
577+
actualEventNames,
578+
expectedEventNames,
579+
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
580+
);
581+
582+
done();
583+
});
584+
});
585+
});

observability-test/table.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ describe('Table', () => {
9292
extend(Table, TableCached);
9393
table = new Table(DATABASE, NAME);
9494
transaction = new FakeTransaction();
95-
table.observabilityOptions = {tracerProvider: provider};
95+
table._observabilityOptions = {tracerProvider: provider};
9696
});
9797

9898
afterEach(() => {

src/batch-transaction.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class BatchTransaction extends Snapshot {
139139

140140
const traceConfig: traceConfig = {
141141
sql: query,
142-
opts: this.observabilityOptions,
142+
opts: this._observabilityOptions,
143143
};
144144
return startTrace(
145145
'BatchTransaction.createQueryPartitions',
@@ -182,7 +182,7 @@ class BatchTransaction extends Snapshot {
182182
*/
183183
createPartitions_(config, callback) {
184184
const traceConfig: traceConfig = {
185-
opts: this.observabilityOptions,
185+
opts: this._observabilityOptions,
186186
};
187187

188188
return startTrace(
@@ -259,7 +259,7 @@ class BatchTransaction extends Snapshot {
259259
*/
260260
createReadPartitions(options, callback) {
261261
const traceConfig: traceConfig = {
262-
opts: this.observabilityOptions,
262+
opts: this._observabilityOptions,
263263
};
264264

265265
return startTrace(

src/database.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ class Database extends common.GrpcServiceObject {
344344
databaseDialect?: EnumKey<
345345
typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect
346346
> | null;
347-
observabilityConfig: ObservabilityOptions | undefined;
347+
_observabilityOptions?: ObservabilityOptions;
348348
constructor(
349349
instance: Instance,
350350
name: string,
@@ -467,7 +467,7 @@ class Database extends common.GrpcServiceObject {
467467
Object.assign({}, queryOptions),
468468
Database.getEnvironmentQueryOptions()
469469
);
470-
this.observabilityConfig = instance.observabilityConfig;
470+
this._observabilityOptions = instance._observabilityOptions;
471471
}
472472
/**
473473
* @typedef {array} SetDatabaseMetadataResponse
@@ -693,7 +693,7 @@ class Database extends common.GrpcServiceObject {
693693

694694
const sessions = (resp!.session || []).map(metadata => {
695695
const session = this.session(metadata.name!);
696-
session.observabilityConfig = this.observabilityConfig;
696+
session._observabilityOptions = this._observabilityOptions;
697697
session.metadata = metadata;
698698
return session;
699699
});
@@ -738,6 +738,7 @@ class Database extends common.GrpcServiceObject {
738738
const id = identifier.transaction;
739739
const transaction = new BatchTransaction(session, options);
740740
transaction.id = id;
741+
transaction._observabilityOptions = this._observabilityOptions;
741742
transaction.readTimestamp = identifier.timestamp as PreciseDate;
742743
return transaction;
743744
}
@@ -827,7 +828,7 @@ class Database extends common.GrpcServiceObject {
827828
? (optionsOrCallback as TimestampBounds)
828829
: {};
829830

830-
const q = {opts: this.observabilityConfig};
831+
const q = {opts: this._observabilityOptions};
831832
return startTrace('Database.createBatchTransaction', q, span => {
832833
this.pool_.getSession((err, session) => {
833834
if (err) {
@@ -1085,6 +1086,7 @@ class Database extends common.GrpcServiceObject {
10851086
/CREATE TABLE `*([^\s`(]+)/
10861087
)![1];
10871088
const table = this.table(tableName!);
1089+
table._observabilityOptions = this._observabilityOptions;
10881090
callback!(null, table, operation!, resp!);
10891091
});
10901092
}
@@ -1873,7 +1875,7 @@ class Database extends common.GrpcServiceObject {
18731875
delete (gaxOpts as GetSessionsOptions).pageToken;
18741876
}
18751877

1876-
const q = {opts: this.observabilityConfig};
1878+
const q = {opts: this._observabilityOptions};
18771879
return startTrace('Database.getSessions', q, span => {
18781880
this.request<
18791881
google.spanner.v1.ISession,
@@ -1895,7 +1897,7 @@ class Database extends common.GrpcServiceObject {
18951897
sessionInstances = sessions.map(metadata => {
18961898
const session = self.session(metadata.name!);
18971899
session.metadata = metadata;
1898-
session.observabilityConfig = this.observabilityConfig;
1900+
session._observabilityOptions = this._observabilityOptions;
18991901
return session;
19001902
});
19011903
}
@@ -2056,7 +2058,7 @@ class Database extends common.GrpcServiceObject {
20562058
? (optionsOrCallback as TimestampBounds)
20572059
: {};
20582060

2059-
const q = {opts: this.observabilityConfig};
2061+
const q = {opts: this._observabilityOptions};
20602062
return startTrace('Database.getSnapshot', q, span => {
20612063
this.pool_.getSession((err, session) => {
20622064
if (err) {
@@ -2157,7 +2159,7 @@ class Database extends common.GrpcServiceObject {
21572159
? (optionsOrCallback as GetTransactionOptions)
21582160
: {};
21592161

2160-
const q = {opts: this.observabilityConfig};
2162+
const q = {opts: this._observabilityOptions};
21612163
return startTrace('Database.getTransaction', q, span => {
21622164
this.pool_.getSession((err, session, transaction) => {
21632165
if (options.requestOptions) {
@@ -2784,7 +2786,7 @@ class Database extends common.GrpcServiceObject {
27842786
? (optionsOrCallback as TimestampBounds)
27852787
: {};
27862788

2787-
const q = {sql: query, opts: this.observabilityConfig};
2789+
const q = {sql: query, opts: this._observabilityOptions};
27882790
return startTrace('Database.run', q, span => {
27892791
this.runStream(query, options)
27902792
.on('error', err => {
@@ -3005,7 +3007,7 @@ class Database extends common.GrpcServiceObject {
30053007
options?: TimestampBounds
30063008
): PartialResultStream {
30073009
const proxyStream: Transform = through.obj();
3008-
const q = {sql: query, opts: this.observabilityConfig};
3010+
const q = {sql: query, opts: this._observabilityOptions};
30093011
return startTrace('Database.runStream', q, span => {
30103012
this.pool_.getSession((err, session) => {
30113013
if (err) {
@@ -3183,7 +3185,7 @@ class Database extends common.GrpcServiceObject {
31833185
? (optionsOrRunFn as RunTransactionOptions)
31843186
: {};
31853187

3186-
const q = {opts: this.observabilityConfig};
3188+
const q = {opts: this._observabilityOptions};
31873189
startTrace('Database.runTransaction', q, span => {
31883190
this.pool_.getSession((err, session?, transaction?) => {
31893191
if (err) {
@@ -3576,7 +3578,7 @@ class Database extends common.GrpcServiceObject {
35763578
? (optionsOrCallback as CallOptions)
35773579
: {};
35783580

3579-
const q = {opts: this.observabilityConfig};
3581+
const q = {opts: this._observabilityOptions};
35803582
return startTrace('Database.writeAtLeastOnce', q, span => {
35813583
this.pool_.getSession((err, session?, transaction?) => {
35823584
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {

0 commit comments

Comments
 (0)