Skip to content

Commit

Permalink
Allow configuring (opt-in) IMC async handler (#8311)
Browse files Browse the repository at this point in the history
* Allow configuring (opt-in) IMC async handler

We switched to use the sync handler by default, however, it was
reported that in some cases, this is not wanted as it slows down
the source event senders since it needs to wait for all subscribers
to receive events.

While this is the best default behavior since reduces lost events in
InMemoryChannel, we want to allow configuring this behavior, while
documenting the downsides (follow up to docs repo)

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Propagate annotations and labels to channel

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add E2E tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use constant in tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Only propagates messaging.knative.dev annotations

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add unit test

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Make annotation val explicit string.......

Signed-off-by: Matthias Wessendorf <[email protected]>

* channel impl did not support setting annotations before........

Signed-off-by: Matthias Wessendorf <[email protected]>

* Instead of re-installing the channel_impl, lets update the annotations. Otherwise we loose the channel's Spec.Subscribers

Signed-off-by: Matthias Wessendorf <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Signed-off-by: Matthias Wessendorf <[email protected]>
Co-authored-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
pierDipi and matzew authored Nov 18, 2024
1 parent bc6e878 commit 8fed0be
Show file tree
Hide file tree
Showing 16 changed files with 482 additions and 39 deletions.
11 changes: 10 additions & 1 deletion pkg/apis/messaging/v1/in_memory_channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

// +genclient
Expand All @@ -44,6 +45,14 @@ type InMemoryChannel struct {
Status InMemoryChannelStatus `json:"status,omitempty"`
}

var (
// AsyncHandlerAnnotation controls whether InMemoryChannel uses the async handler.
//
// Async handler is subject to event loss since it responds with 200 before forwarding the event
// to all subscriptions.
AsyncHandlerAnnotation = SchemeGroupVersion.Group + "/async-handler"
)

var (
// Check that InMemoryChannel can be validated and defaulted.
_ apis.Validatable = (*InMemoryChannel)(nil)
Expand Down
5 changes: 4 additions & 1 deletion pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ type Subscription struct {
// Config for a fanout.EventHandler.
type Config struct {
Subscriptions []Subscription `json:"subscriptions"`
// Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
//
// Async handler is subject to event loss since it responds with 200 before forwarding the event
// to all subscriptions.
AsyncHandler bool `json:"asyncHandler,omitempty"`
}

Expand Down
27 changes: 24 additions & 3 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -108,10 +109,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
return err
}

var tmpChannelableSpec duckv1.ChannelableSpec = duckv1.ChannelableSpec{
var tmpChannelableSpec = duckv1.ChannelableSpec{
Delivery: b.Spec.Delivery,
}

metadata := b.ObjectMeta.DeepCopy()
channelAnnotations := map[string]string{
eventing.ScopeAnnotationKey: eventing.ScopeCluster,
}
for k, v := range metadata.GetAnnotations() {
if strings.HasPrefix(k, messagingv1.SchemeGroupVersion.Group) {
channelAnnotations[k] = v
}
}

logging.FromContext(ctx).Infow("Reconciling the trigger channel")
c, err := ducklib.NewPhysicalChannel(
chanMan.template.TypeMeta,
Expand All @@ -122,7 +133,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
*kmeta.NewControllerRef(b),
},
Labels: TriggerChannelLabels(b.Name, b.Namespace),
Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster},
Annotations: channelAnnotations,
},
ducklib.WithChannelableSpec(tmpChannelableSpec),
ducklib.WithPhysicalChannelSpec(chanMan.template.Spec),
Expand Down Expand Up @@ -392,7 +403,9 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return nil, fmt.Errorf("failed to convert %s/%s into Channelable: %w", channelObjRef.Namespace, channelObjRef.Name, err)
}

if equality.Semantic.DeepEqual(desired.Spec.Delivery, channelable.Spec.Delivery) {
if equality.Semantic.DeepDerivative(desired.Spec.Delivery, channelable.Spec.Delivery) &&
equality.Semantic.DeepDerivative(desired.Annotations, channelable.Annotations) &&
equality.Semantic.DeepDerivative(desired.Labels, channelable.Labels) {
// If propagated/mutable properties match return the Channel.
return channelable, nil
}
Expand All @@ -402,12 +415,20 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
jsonPatch, err := duckapis.CreatePatch(
// Existing Channel properties
duckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Annotations: channelable.Annotations,
Labels: channelable.Labels,
},
Spec: duckv1.ChannelableSpec{
Delivery: channelable.Spec.Delivery,
},
},
// Desired Channel properties
duckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Annotations: desired.Annotations,
Labels: desired.Labels,
},
Spec: duckv1.ChannelableSpec{
Delivery: desired.Spec.Delivery,
},
Expand Down
43 changes: 43 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
Expand Down Expand Up @@ -405,6 +406,48 @@ func TestReconcile(t *testing.T) {
WithDLSNotConfigured(),
WithBrokerEventPoliciesReadyBecauseOIDCDisabled()),
}},
}, {
Name: "Propagate annotations",
Key: testKey,
Objects: []runtime.Object{
makeDLSServiceAsUnstructured(),
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions,
WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true")),
createChannel(withChannelReady),
imcConfigMap(),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true"),
WithBrokerReady,
WithDLSNotConfigured(),
WithBrokerAddressURI(brokerAddress),
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName),
WithBrokerEventPoliciesReadyBecauseOIDCDisabled()),
}},
WantPatches: []clientgotesting.PatchActionImpl{
{
ActionImpl: clientgotesting.ActionImpl{
Namespace: testNS,
},
Name: fmt.Sprintf("%s-kne-trigger", brokerName),
Patch: []byte(`[{"op":"add","path":"/metadata/annotations/messaging.knative.dev~1async-handler","value":"` + "true" + `"}]`),
},
},
}, {
Name: "Successful Reconciliation with a Channel with CA certs",
Key: testKey,
Expand Down
11 changes: 10 additions & 1 deletion pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dispatcher
import (
"context"
"fmt"
"strconv"

listers "knative.dev/eventing/pkg/client/listers/messaging/v1"

Expand Down Expand Up @@ -250,13 +251,21 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (
subs[i] = *conf
}

async := false
if v, ok := imc.Annotations[v1.AsyncHandlerAnnotation]; ok {
b, err := strconv.ParseBool(v)
if err == nil {
async = b
}
}

return &multichannelfanout.ChannelConfig{
Namespace: imc.Namespace,
Name: imc.Name,
HostName: imc.Status.Address.URL.Host,
Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name),
FanoutConfig: fanout.Config{
AsyncHandler: false,
AsyncHandler: async,
Subscriptions: subs,
},
}, nil
Expand Down
105 changes: 105 additions & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package dispatcher
import (
"context"
"net/http"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgotesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -662,3 +664,106 @@ func (f *fakeMultiChannelHandler) GetChannelHandler(host string) fanout.EventHan
func (f *fakeMultiChannelHandler) CountChannelHandlers() int {
return len(f.handlers)
}

func Test_newConfigForInMemoryChannelAsyncHandler(t *testing.T) {
ctx, _ := SetupFakeContext(t, SetUpInformerSelector)

type args struct {
ctx context.Context
imc *v1.InMemoryChannel
}
tests := []struct {
name string
args args
wantAsync bool
wantErr bool
}{
{
name: "async handler",
args: args{
ctx: ctx,
imc: &v1.InMemoryChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "n",
Namespace: "ns",
Annotations: map[string]string{
v1.AsyncHandlerAnnotation: "true",
},
},
Status: v1.InMemoryChannelStatus{
ChannelableStatus: eventingduckv1.ChannelableStatus{
AddressStatus: duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: apis.HTTPS("something"),
},
},
},
},
},
},
wantAsync: true,
wantErr: false,
},
{
name: "sync handler, default",
args: args{
ctx: ctx,
imc: &v1.InMemoryChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "n",
Namespace: "ns",
},
Status: v1.InMemoryChannelStatus{
ChannelableStatus: eventingduckv1.ChannelableStatus{
AddressStatus: duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: apis.HTTPS("something"),
},
},
},
},
},
},
wantAsync: false,
wantErr: false,
},
{
name: "sync handler, explicit",
args: args{
ctx: ctx,
imc: &v1.InMemoryChannel{
ObjectMeta: metav1.ObjectMeta{
Name: "n",
Namespace: "ns",
Annotations: map[string]string{
v1.AsyncHandlerAnnotation: "false",
},
},
Status: v1.InMemoryChannelStatus{
ChannelableStatus: eventingduckv1.ChannelableStatus{
AddressStatus: duckv1.AddressStatus{
Address: &duckv1.Addressable{
URL: apis.HTTPS("something"),
},
},
},
},
},
},
wantAsync: false,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newConfigForInMemoryChannel(tt.args.ctx, tt.args.imc)
if (err != nil) != tt.wantErr {
t.Errorf("newConfigForInMemoryChannel() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got.FanoutConfig.AsyncHandler, tt.wantAsync) {
t.Errorf("newConfigForInMemoryChannel() got = %v, want %v", got, tt.wantAsync)
}
})
}
}
9 changes: 9 additions & 0 deletions pkg/reconciler/testing/v1/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func WithBrokerFinalizers(finalizers ...string) BrokerOption {
}
}

func WithBrokerAnnotation(key, value string) BrokerOption {
return func(b *v1.Broker) {
if b.Annotations == nil {
b.Annotations = map[string]string{}
}
b.Annotations[key] = value
}
}

func WithBrokerResourceVersion(rv string) BrokerOption {
return func(b *v1.Broker) {
b.ResourceVersion = rv
Expand Down
16 changes: 16 additions & 0 deletions test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,22 @@ func TestBrokerRedelivery(t *testing.T) {
env.TestSet(ctx, t, broker.BrokerRedelivery())
}

// TestBrokerPropagatesMetadata test Broker reconciler propagates metadata to channel.
func TestBrokerPropagatesMetadata(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.ParallelTest(ctx, t, broker.PropagatesMetadata())
}

func TestBrokerDeadLetterSinkExtensions(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 23 additions & 1 deletion test/rekt/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"testing"
"time"

"knative.dev/eventing/test/rekt/features/authz"
"knative.dev/reconciler-test/pkg/feature"

"knative.dev/eventing/test/rekt/features/authz"

"github.com/cloudevents/sdk-go/v2/binding"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -205,6 +206,27 @@ func TestChannelDeadLetterSink(t *testing.T) {
env.Test(ctx, t, channel.DeadLetterSink(createSubscriberFn))
}

/*
TestChannelAsyncHandler tests if the async handler can be configured on the channel.
*/
func TestChannelAsyncHandler(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn {
return subscription.WithSubscriber(ref, uri, "")
}
env.ParallelTest(ctx, t, channel.AsyncHandler(createSubscriberFn))
env.ParallelTest(ctx, t, channel.AsyncHandlerUpdate(createSubscriberFn))
}

// TestGenericChannelDeadLetterSink tests if the events that cannot be delivered end up in
// the dead letter sink.
func TestGenericChannelDeadLetterSink(t *testing.T) {
Expand Down
Loading

0 comments on commit 8fed0be

Please sign in to comment.