@@ -36,20 +36,19 @@ let empty = BigQueryConf.create()
36
36
37
37
module internal Impl =
38
38
open System
39
- open System.IO
40
39
open Google.Apis .Bigquery .v2 .Data
41
40
open Logary.Internals .Chiron
42
41
module E = Json.Encode
43
42
44
43
let tableSchema =
45
44
let builder = TableSchemaBuilder()
46
- let field name typ mode = new TableFieldSchema( Name= name, Type= typ, Mode= mode)
45
+ let field name typ mode = TableFieldSchema( Name= name, Type= typ, Mode= mode)
47
46
let req name typ = field name typ " REQUIRED"
48
47
let opt name typ = field name typ " NULLABLE"
49
48
builder.Add( req " level" " INT64" )
50
49
builder.Add( opt " event" " STRING" )
51
- builder.Add( opt " monetaryValue.amount " " NUMERIC" )
52
- builder.Add( opt " monetaryValue.currency " " STRING" )
50
+ builder.Add( opt " money_amount " " NUMERIC" )
51
+ builder.Add( opt " money_currency " " STRING" )
53
52
builder.Add( req " name" " STRING" )
54
53
builder.Add( req " timestamp" " TIMESTAMP" )
55
54
/// JSON string
@@ -74,11 +73,11 @@ module internal Impl =
74
73
let me = x.getAsOrThrow< EventMessage>()
75
74
row.[ " event" ] <- me.event
76
75
if me.monetaryValue.IsSome then
77
- row.[ " monetaryValue.amount " ] <- me.monetaryValue.Value.value.asFloat
78
- row.[ " monetaryValue.currency " ] <- me.monetaryValue.Value.unit.name
76
+ row.[ " money_amount " ] <- me.monetaryValue.Value.value.asFloat
77
+ row.[ " money_currency " ] <- me.monetaryValue.Value.unit.name.Value
79
78
else
80
- row.[ " monetaryValue.amount " ] <- 0.
81
- row.[ " monetaryValue.currency " ] <- " EUR"
79
+ row.[ " money_amount " ] <- 0.
80
+ row.[ " money_currency " ] <- " EUR"
82
81
| _ -> ()
83
82
row
84
83
@@ -97,12 +96,16 @@ module internal Impl =
97
96
98
97
let rec initialise () =
99
98
job {
99
+ logger.debug " Starting BigQuery target"
100
100
let! platform = Platform.InstanceAsync()
101
+ logger.debug " Received platform information"
101
102
let projectId = conf.projectId |> Option.defaultValue platform.ProjectId
102
103
let client = BigQueryClient.Create( projectId)
103
104
let dataset = client.GetOrCreateDataset( conf.dataset)
104
105
let table = dataset.GetOrCreateTable( conf.table, tableSchema)
105
106
do ! maybeUpdate projectId dataset table tableSchema
107
+
108
+ logger.debug " Going into the Running state"
106
109
return ! running { table= table; client= client }
107
110
}
108
111
@@ -132,11 +135,18 @@ module internal Impl =
132
135
}
133
136
134
137
api.shutdownCh ^=> fun ack ->
138
+ logger.debug " Shutting down BigQuery target"
135
139
Job.Scheduler.isolate ( fun _ -> ( state :> IDisposable) .Dispose())
136
140
>>=. ack *<= ()
137
141
] :> Job<_>
138
142
139
- initialise ()
143
+ Job.catch ( initialise ()) >>= function
144
+ | Choice1Of2 () ->
145
+ Job.unit ()
146
+ | Choice2Of2 e ->
147
+ printfn " %A " e
148
+ Job.raises e
149
+
140
150
141
151
/// Create a new BigQuery target
142
152
[<CompiledName " Create" >]
0 commit comments