Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix/batch-identity-…
Browse files Browse the repository at this point in the history
…insert-persists-on-failure
  • Loading branch information
zepatrik committed Nov 14, 2024
2 parents 914f572 + 00da05d commit 86aea3b
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 121 deletions.
2 changes: 1 addition & 1 deletion courier/http_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *httpChannel) Dispatch(ctx context.Context, msg Message) (err error) {
ctx, span := c.d.Tracer(ctx).Tracer().Start(ctx, "courier.httpChannel.Dispatch")
defer otelx.End(span, &err)

builder, err := request.NewBuilder(ctx, c.requestConfig, c.d, nil)
builder, err := request.NewBuilder(ctx, c.requestConfig, c.d)
if err != nil {
return errors.WithStack(err)
}
Expand Down
6 changes: 5 additions & 1 deletion embedx/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@
"title": "Web-Hook Configuration",
"description": "Define what the hook should do",
"properties": {
"id": {
"type": "string",
"description": "The ID of the hook. Used to identify the hook in logs and errors. For debugging purposes only."
},
"response": {
"title": "Response Handling",
"description": "How the web hook should handle the response",
Expand Down Expand Up @@ -2274,7 +2278,7 @@
"id": {
"type": "string",
"title": "Channel id",
"description": "The channel id. Corresponds to the .via property of the identity schema for recovery, verification, etc. Currently only phone is supported.",
"description": "The channel id. Corresponds to the .via property of the identity schema for recovery, verification, etc. Currently only sms is supported.",
"maxLength": 32,
"enum": ["sms"]
},
Expand Down
30 changes: 24 additions & 6 deletions request/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,36 @@ type (
deps Dependencies
cache *ristretto.Cache[[]byte, []byte]
}
options struct {
cache *ristretto.Cache[[]byte, []byte]
}
BuilderOption = func(*options)
)

func NewBuilder(ctx context.Context, config json.RawMessage, deps Dependencies, jsonnetCache *ristretto.Cache[[]byte, []byte]) (_ *Builder, err error) {
func WithCache(cache *ristretto.Cache[[]byte, []byte]) BuilderOption {
return func(o *options) {
o.cache = cache
}
}

func NewBuilder(ctx context.Context, config json.RawMessage, deps Dependencies, o ...BuilderOption) (_ *Builder, err error) {
_, span := deps.Tracer(ctx).Tracer().Start(ctx, "request.NewBuilder")
defer otelx.End(span, &err)

c, err := parseConfig(config)
if err != nil {
var opts options
for _, f := range o {
f(&opts)
}

c := Config{}
if err := json.Unmarshal(config, &c); err != nil {
return nil, err
}

span.SetAttributes(attribute.String("url", c.URL), attribute.String("method", c.Method))
span.SetAttributes(
attribute.String("url", c.URL),
attribute.String("method", c.Method),
)

r, err := retryablehttp.NewRequest(c.Method, c.URL, nil)
if err != nil {
Expand All @@ -66,9 +84,9 @@ func NewBuilder(ctx context.Context, config json.RawMessage, deps Dependencies,

return &Builder{
r: r,
Config: c,
Config: &c,
deps: deps,
cache: jsonnetCache,
cache: opts.cache,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions request/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestBuildRequest(t *testing.T) {
} {
t.Run(
"request-type="+tc.name, func(t *testing.T) {
rb, err := NewBuilder(context.Background(), json.RawMessage(tc.rawConfig), newTestDependencyProvider(t), nil)
rb, err := NewBuilder(context.Background(), json.RawMessage(tc.rawConfig), newTestDependencyProvider(t))
require.NoError(t, err)

assert.Equal(t, tc.bodyTemplateURI, rb.Config.TemplateURI)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestBuildRequest(t *testing.T) {
"method": "POST",
"body": "file://./stub/cancel_body.jsonnet"
}`,
), newTestDependencyProvider(t), nil)
), newTestDependencyProvider(t))
require.NoError(t, err)

_, err = rb.BuildRequest(context.Background(), json.RawMessage(`{}`))
Expand Down
42 changes: 15 additions & 27 deletions request/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,36 @@ type (
}

Config struct {
Method string `json:"method"`
URL string `json:"url"`
TemplateURI string `json:"body"`
Header http.Header `json:"headers"`
Auth Auth `json:"auth,omitempty"`
}
)

func parseConfig(r json.RawMessage) (*Config, error) {
type rawConfig struct {
Method string `json:"method"`
URL string `json:"url"`
TemplateURI string `json:"body"`
Header json.RawMessage `json:"headers"`
Auth Auth `json:"auth,omitempty"`
Header http.Header `json:"-"`
RawHeader json.RawMessage `json:"headers"`
Auth Auth `json:"auth"`
}
)

var rc rawConfig
err := json.Unmarshal(r, &rc)
func (c *Config) UnmarshalJSON(raw []byte) error {
type Alias Config
var a Alias
err := json.Unmarshal(raw, &a)
if err != nil {
return nil, err
return err
}

rawHeader := gjson.ParseBytes(rc.Header).Map()
hdr := http.Header{}
rawHeader := gjson.ParseBytes(a.RawHeader).Map()
a.Header = make(http.Header, len(rawHeader))

_, ok := rawHeader["Content-Type"]
if !ok {
hdr.Set("Content-Type", ContentTypeJSON)
a.Header.Set("Content-Type", ContentTypeJSON)
}

for key, value := range rawHeader {
hdr.Set(key, value.String())
a.Header.Set(key, value.String())
}

c := Config{
Method: rc.Method,
URL: rc.URL,
TemplateURI: rc.TemplateURI,
Header: hdr,
Auth: rc.Auth,
}
*c = Config(a)

return &c, nil
return nil
}
5 changes: 3 additions & 2 deletions selfservice/hook/password_migration_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ory/herodot"
"github.com/ory/kratos/request"
"github.com/ory/kratos/schema"
"github.com/ory/kratos/x"
"github.com/ory/x/otelx"
)

Expand Down Expand Up @@ -52,9 +53,9 @@ func (p *PasswordMigration) Execute(ctx context.Context, data *PasswordMigration
defer otelx.End(span, &err)

if emitEvent {
instrumentHTTPClientForEvents(ctx, httpClient)
instrumentHTTPClientForEvents(ctx, httpClient, x.NewUUID(), "password_migration_hook")
}
builder, err := request.NewBuilder(ctx, p.conf, p.deps, nil)
builder, err := request.NewBuilder(ctx, p.conf, p.deps)
if err != nil {
return errors.WithStack(err)
}
Expand Down
20 changes: 12 additions & 8 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool()
parseResponse = gjson.GetBytes(e.conf, "response.parse").Bool()
emitEvent = gjson.GetBytes(e.conf, "emit_analytics_event").Bool() || !gjson.GetBytes(e.conf, "emit_analytics_event").Exists() // default true
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
webhookID = gjson.GetBytes(e.conf, "id").Str
// The trigger ID is a random ID. It can be used to correlate webhook requests across retries.
triggerID = x.NewUUID()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
)
if ignoreResponse && (parseResponse || canInterrupt) {
return errors.WithStack(herodot.ErrInternalServerError.WithReasonf("A webhook is configured to ignore the response but also to parse the response. This is not possible."))
Expand All @@ -318,7 +321,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
defer otelx.End(span, &finalErr)

if emitEvent {
instrumentHTTPClientForEvents(ctx, httpClient)
instrumentHTTPClientForEvents(ctx, httpClient, triggerID, webhookID)
}

defer func(startTime time.Time) {
Expand All @@ -329,7 +332,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
}).WithField("duration", time.Since(startTime))
if finalErr != nil {
if emitEvent && !errors.Is(finalErr, context.Canceled) {
span.AddEvent(events.NewWebhookFailed(ctx, finalErr))
span.AddEvent(events.NewWebhookFailed(ctx, finalErr, triggerID, webhookID))
}
if ignoreResponse {
logger.WithError(finalErr).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored")
Expand All @@ -339,12 +342,12 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
} else {
logger.Info("Webhook request succeeded")
if emitEvent {
span.AddEvent(events.NewWebhookSucceeded(ctx))
span.AddEvent(events.NewWebhookSucceeded(ctx, triggerID, webhookID))
}
}
}(time.Now())

builder, err := request.NewBuilder(ctx, e.conf, e.deps, jsonnetCache)
builder, err := request.NewBuilder(ctx, e.conf, e.deps, request.WithCache(jsonnetCache))
if err != nil {
return err
}
Expand Down Expand Up @@ -551,7 +554,7 @@ func isTimeoutError(err error) bool {
return errors.As(err, &te) && te.Timeout() || errors.Is(err, context.DeadlineExceeded)
}

func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehttp.Client) {
func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehttp.Client, triggerID uuid.UUID, webhookID string) {
// TODO(@alnr): improve this implementation to redact sensitive data
var (
attempt = 0
Expand All @@ -560,8 +563,9 @@ func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehtt
)
httpClient.RequestLogHook = func(_ retryablehttp.Logger, req *http.Request, retryNumber int) {
attempt = retryNumber + 1
requestID = uuid.Must(uuid.NewV4())
requestID = x.NewUUID()
req.Header.Set("Ory-Webhook-Request-ID", requestID.String())
req.Header.Set("Ory-Webhook-Trigger-ID", triggerID.String())
// TODO(@alnr): redact sensitive data
// reqBody, _ = httputil.DumpRequestOut(req, true)
reqBody = []byte("<redacted>")
Expand All @@ -572,6 +576,6 @@ func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehtt
// resBody = resBody[:min(len(resBody), 2<<10)] // truncate response body to 2 kB for event
// TODO(@alnr): redact sensitive data
resBody := []byte("<redacted>")
trace.SpanFromContext(ctx).AddEvent(events.NewWebhookDelivered(ctx, res.Request.URL, reqBody, res.StatusCode, resBody, attempt, requestID))
trace.SpanFromContext(ctx).AddEvent(events.NewWebhookDelivered(ctx, res.Request.URL, reqBody, res.StatusCode, resBody, attempt, requestID, triggerID, webhookID))
}
}
Loading

0 comments on commit 86aea3b

Please sign in to comment.