Skip to content

Commit

Permalink
feat: support emqx dashboard https port
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Oct 24, 2023
1 parent fe6a7b6 commit 2b55123
Show file tree
Hide file tree
Showing 11 changed files with 607 additions and 381 deletions.
81 changes: 64 additions & 17 deletions apis/apps/v2beta1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v2beta1
import (
"fmt"
"net"
"strconv"
"strings"

emperror "emperror.dev/errors"
Expand Down Expand Up @@ -172,31 +173,65 @@ func AddLabel(labels map[string]string, labelKey, labelValue string) map[string]
return labels
}

func GetDashboardServicePort(hoconString string) (*corev1.ServicePort, error) {
func GetDashboardPortMap(hoconString string) (map[string]int32, error) {
portMap := make(map[string]int32)
portMap["dashboard"] = 18083 // default port

hoconConfig, err := hocon.ParseString(hoconString)
if err != nil {
return nil, emperror.Wrapf(err, "failed to parse %s", hoconString)
}
dashboardPort := strings.Trim(hoconConfig.GetString("dashboard.listeners.http.bind"), `"`)
if dashboardPort == "" {
return nil, emperror.Errorf("failed to get dashboard.listeners.http.bind in %s", hoconConfig.String())

if dashboardPort := strings.Trim(hoconConfig.GetString("dashboard.listeners.http.bind"), `"`); dashboardPort != "" {
if !strings.Contains(dashboardPort, ":") {
// example: ":18083"
dashboardPort = fmt.Sprintf(":%s", dashboardPort)
}
_, strPort, _ := net.SplitHostPort(dashboardPort)
if port, _ := strconv.Atoi(strPort); port != 0 {
portMap["dashboard"] = int32(port)
} else {
// port = 0 means disable dashboard
// delete default port
delete(portMap, "dashboard")
}
}
if !strings.Contains(dashboardPort, ":") {
// example: ":18083"
dashboardPort = fmt.Sprintf(":%s", dashboardPort)

if dashboardHttpsPort := strings.Trim(hoconConfig.GetString("dashboard.listeners.https.bind"), `"`); dashboardHttpsPort != "" {
if !strings.Contains(dashboardHttpsPort, ":") {
// example: ":18084"
dashboardHttpsPort = fmt.Sprintf(":%s", dashboardHttpsPort)
}
_, strPort, _ := net.SplitHostPort(dashboardHttpsPort)
if port, _ := strconv.Atoi(strPort); port != 0 {
portMap["dashboard-https"] = int32(port)
} else {
// port = 0 means disable dashboard
// delete default port
delete(portMap, "dashboard-https")
}
}
_, strPort, err := net.SplitHostPort(dashboardPort)

return portMap, nil
}

func GetDashboardServicePort(hoconString string) ([]corev1.ServicePort, error) {
dashboardSvcPortList := []corev1.ServicePort{}
portMap, err := GetDashboardPortMap(hoconString)
if err != nil {
return nil, emperror.Wrapf(err, "failed to split %s", dashboardPort)
return nil, emperror.Wrapf(err, "failed to get dashboard port map")
}
intStrValue := intstr.Parse(strPort)

return &corev1.ServicePort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
Port: int32(intStrValue.IntValue()),
TargetPort: intStrValue,
}, nil

for name, port := range portMap {
dashboardSvcPortList = append(dashboardSvcPortList, corev1.ServicePort{
Name: name,
Protocol: corev1.ProtocolTCP,
Port: port,
TargetPort: intstr.FromInt(int(port)),
})
}

return dashboardSvcPortList, nil
}

func GetListenersServicePorts(hoconString string) ([]corev1.ServicePort, error) {
Expand Down Expand Up @@ -365,6 +400,18 @@ func MergeContainerPorts(ports1, ports2 []corev1.ContainerPort) []corev1.Contain
return result
}

func TransServicePortsToContainerPorts(ports []corev1.ServicePort) []corev1.ContainerPort {
result := make([]corev1.ContainerPort, 0, len(ports))
for _, item := range ports {
result = append(result, corev1.ContainerPort{
Name: item.Name,
ContainerPort: item.Port,
Protocol: item.Protocol,
})
}
return result
}

func mergeMap(dst, src map[string]string) map[string]string {
if dst == nil {
dst = make(map[string]string)
Expand Down
114 changes: 96 additions & 18 deletions apis/apps/v2beta1/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,98 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestGetDashboardPortMap(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
instance := &EMQX{}
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Equal(t, map[string]int32{
"dashboard": 18083,
}, got)
})

t.Run("wrong config", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `hello world`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.ErrorContains(t, err, "failed to parse")
assert.Nil(t, got)
})

t.Run("a single http port", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `dashboard.listeners.http.bind = 18083`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Equal(t, map[string]int32{
"dashboard": 18083,
}, got)
})

t.Run("a single IPV4 http port", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `dashboard.listeners.http.bind = "0.0.0.0:18083"`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Equal(t, map[string]int32{
"dashboard": 18083,
}, got)
})

t.Run("a single IPV6 http port", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `dashboard.listeners.http.bind = "[::]:18083"`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Equal(t, map[string]int32{
"dashboard": 18083,
}, got)
})

t.Run("a single https port", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `dashboard.listeners.https.bind = 18084`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Equal(t, map[string]int32{
"dashboard": 18083, // default http port
"dashboard-https": 18084,
}, got)
})

t.Run("disable http port and a single https port", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `
dashboard.listeners.http.bind = 0
dashboard.listeners.https.bind = 18084
`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Equal(t, map[string]int32{
"dashboard-https": 18084,
}, got)
})

t.Run("disable all port", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `
dashboard.listeners.http.bind = 0
dashboard.listeners.https.bind = 0
`
got, err := GetDashboardPortMap(instance.Spec.Config.Data)
assert.Nil(t, err)
assert.Empty(t, got)
})
}

func TestGetDashboardServicePort(t *testing.T) {
expect := &corev1.ServicePort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
Port: int32(18083),
TargetPort: intstr.Parse("18083"),
expect := []corev1.ServicePort{
{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
Port: int32(18083),
TargetPort: intstr.Parse("18083"),
},
}

t.Run("a single port", func(t *testing.T) {
Expand All @@ -56,26 +142,18 @@ func TestGetDashboardServicePort(t *testing.T) {
assert.Equal(t, expect, got)
})

t.Run("wrong config", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `hello world`
got, err := GetDashboardServicePort(instance.Spec.Config.Data)
assert.ErrorContains(t, err, "failed to parse")
assert.Nil(t, got)
})

t.Run("empty config", func(t *testing.T) {
instance := &EMQX{}
got, err := GetDashboardServicePort(instance.Spec.Config.Data)
assert.ErrorContains(t, err, "failed to get dashboard.listeners.http.bind")
assert.Nil(t, got)
assert.Nil(t, err)
assert.Equal(t, expect, got)
})

t.Run("empty dashboard listeners config", func(t *testing.T) {
t.Run("wrong config", func(t *testing.T) {
instance := &EMQX{}
instance.Spec.Config.Data = `foo = bar`
instance.Spec.Config.Data = `hello world`
got, err := GetDashboardServicePort(instance.Spec.Config.Data)
assert.ErrorContains(t, err, "failed to get dashboard.listeners.http.bind")
assert.ErrorContains(t, err, "failed to parse")
assert.Nil(t, got)
})
}
Expand Down
35 changes: 14 additions & 21 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}

func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
var containerPort corev1.ContainerPort
if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: 18083,
}
} else {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: svcPort.Port,
}
}
svcPorts, _ := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)

preSts := generateStatefulSet(instance)
podTemplateSpecHash := computeHash(preSts.Spec.Template.DeepCopy(), instance.Status.CoreNodesStatus.CollisionCount)
Expand All @@ -137,14 +124,20 @@ func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
preSts.Spec.Template.Labels = appsv2beta1.CloneAndAddLabel(preSts.Spec.Template.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preSts.Spec.Template.Spec.Containers[0].Ports = appsv2beta1.MergeContainerPorts(
preSts.Spec.Template.Spec.Containers[0].Ports,
[]corev1.ContainerPort{
containerPort,
},
appsv2beta1.TransServicePortsToContainerPorts(svcPorts),
)
preSts.Spec.Template.Spec.Containers[0].Env = append([]corev1.EnvVar{
{Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))},
}, preSts.Spec.Template.Spec.Containers[0].Env...)

for _, p := range preSts.Spec.Template.Spec.Containers[0].Ports {
var name string
if p.Name == "dashboard" {
name = "EMQX_DASHBOARD__LISTENERS__HTTP__BIND"
}
if p.Name == "dashboard-https" {
name = "EMQX_DASHBOARD__LISTENERS__HTTPS__BIND"
}
preSts.Spec.Template.Spec.Containers[0].Env = append([]corev1.EnvVar{
{Name: name, Value: strconv.Itoa(int(p.ContainerPort))},
}, preSts.Spec.Template.Spec.Containers[0].Env...)
}
return preSts
}

Expand Down
Loading

0 comments on commit 2b55123

Please sign in to comment.