Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4ae068c
Update informer_factory.lua
ChuanFF Jun 4, 2025
d01fed0
Merge branch 'master' of https://github.com/ChuanFF/apisix into endpo…
ChuanFF Aug 20, 2025
a51fc47
fix: endpointslice should cached when modified, then all slices merge…
ChuanFF Aug 20, 2025
52fb9c4
Merge branch 'master' into endpoint-slices-feat
ChuanFF Sep 15, 2025
955e58d
重命名
ChuanFF Sep 15, 2025
43fbd13
Merge branch 'master' into endpoint-slices-feat
ChuanFF Sep 22, 2025
cb0d9be
endpointslices特性 测试用例
ChuanFF Sep 22, 2025
2eb8f62
endpointslices特性 测试用例修复
ChuanFF Sep 22, 2025
405cd1d
测试用例修复
ChuanFF Sep 23, 2025
a0692d8
测试用例修复
ChuanFF Sep 23, 2025
d021127
测试用例修复
ChuanFF Sep 23, 2025
cf46457
测试用例修复
ChuanFF Sep 23, 2025
3451d22
测试用例修复
ChuanFF Sep 23, 2025
eef5898
测试用例修复
ChuanFF Sep 23, 2025
a934fed
测试用例修复
ChuanFF Sep 23, 2025
f0c40b3
测试用例修复
ChuanFF Sep 23, 2025
d49f6ec
测试用例修复
ChuanFF Sep 23, 2025
474d717
测试用例修复
ChuanFF Sep 23, 2025
d4b938e
测试endpointslices 的增减
ChuanFF Sep 23, 2025
06aa3f1
测试endpointslices 的增减
ChuanFF Sep 23, 2025
e6dda54
测试endpointslices scale
ChuanFF Sep 23, 2025
173b8ee
测试endpointslices scale
ChuanFF Sep 23, 2025
db54c3b
测试endpointslices scale
ChuanFF Sep 23, 2025
591d1e6
测试用例修复
ChuanFF Sep 23, 2025
46d6b6b
测试用例修复
ChuanFF Sep 23, 2025
65fbd0b
测试用例精简
ChuanFF Sep 24, 2025
485e7eb
测试用例精简
ChuanFF Sep 24, 2025
0a6b12e
行长度限制100
ChuanFF Sep 24, 2025
09e20a9
测试用例修复
ChuanFF Sep 24, 2025
6c26a38
测试用例修复
ChuanFF Sep 24, 2025
b869ab1
空行删除
ChuanFF Sep 24, 2025
ea38bb5
prelist
ChuanFF Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apisix/discovery/kubernetes/informer_factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ local function list_watch(informer, apiserver)

core.log.info("begin to list ", informer.kind)
informer.fetch_state = "listing"
if informer.pre_List then
if informer.pre_list then
informer:pre_list()
end

Expand All @@ -298,7 +298,7 @@ local function list_watch(informer, apiserver)
end

informer.fetch_state = "list finished"
if informer.post_List then
if informer.post_list then
informer:post_list()
end

Expand Down
229 changes: 161 additions & 68 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--

local ngx = ngx
local unpack = unpack
local ipairs = ipairs
local pairs = pairs
local string = string
Expand All @@ -25,7 +26,6 @@ local os = os
local error = error
local pcall = pcall
local setmetatable = setmetatable
local type = type
local is_http = ngx.config.subsystem == "http"
local process = require("ngx.process")
local core = require("apisix.core")
Expand All @@ -51,72 +51,169 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end

local function on_endpoint_slices_modified(handle, endpoint)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
return
local function update_endpoint_slices_cache(handle, endpoint_key, slice, slice_name)
if not handle.endpoint_slices_cache[endpoint_key] then
handle.endpoint_slices_cache[endpoint_key] = {}
end
local endpoint_slices = handle.endpoint_slices_cache[endpoint_key]
endpoint_slices[slice_name] = slice
end

core.log.debug(core.json.delay_encode(endpoint))
core.table.clear(endpoint_buffer)

local endpointslices = endpoint.endpoints
if type(endpointslices) == "table" then
for _, endpointslice in ipairs(endpointslices) do
if endpointslice.addresses then
local addresses = endpointslice.addresses
for _, port in ipairs(endpoint.ports or {}) do
local port_name
if port.name then
port_name = port.name
elseif port.targetPort then
port_name = tostring(port.targetPort)
else
port_name = tostring(port.port)
end

if endpointslice.conditions and endpointslice.conditions.ready then
local nodes = endpoint_buffer[port_name]
if nodes == nil then
nodes = core.table.new(0, #endpointslices * #addresses)
endpoint_buffer[port_name] = nodes
end

for _, address in ipairs(addresses) do
core.table.insert(nodes, {
host = address.ip,
port = port.port,
weight = handle.default_weight
})
end
end
end
local function get_endpoints_from_cache(handle, endpoint_key)
local endpoint_slices = handle.endpoint_slices_cache[endpoint_key] or {}
local endpoints = {}
for _, endpoint_slice in pairs(endpoint_slices) do
for port, targets in pairs(endpoint_slice) do
if not endpoints[port] then
endpoints[port] = core.table.new(0, #targets)
end
core.table.insert_tail(endpoints[port], unpack(targets))
end
end
return endpoints
end

for _, ports in pairs(endpoint_buffer) do
for _, nodes in pairs(ports) do
core.table.sort(nodes, sort_nodes_cmp)
end
end
local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
local endpoint_content = core.json.encode(endpoint_buffer, true)
local function update_endpoint_dict(handle, endpoints, endpoint_key)
local endpoint_content = core.json.encode(endpoints, true)
local endpoint_version = ngx.crc32_long(endpoint_content)

local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version)
if err then
core.log.error("set endpoint version into discovery DICT failed, ", err)
return
return false, err
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
return false, err
end
end

local function on_endpoint_slices_modified(handle, endpoint_slice)
if not endpoint_slice.metadata then
core.log.error("endpoint_slice has no metadata, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end
if not endpoint_slice.metadata.name then
core.log.error("endpoint_slice has no metadata.name, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end
if not endpoint_slice.metadata.namespace then
core.log.error("endpoint_slice has no metadata.namespace, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end
if not endpoint_slice.metadata.labels
or not endpoint_slice.metadata.labels["kubernetes.io/service-name"] then
core.log.error("endpoint_slice has no service-name, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end

if handle.namespace_selector and
not handle:namespace_selector(endpoint_slice.metadata.namespace) then
return
end

core.log.debug(core.json.delay_encode(endpoint_slice))
--record nodes to every port in service
local port_to_nodes = {}

local slice_endpoints = endpoint_slice.endpoints
if not slice_endpoints or slice_endpoints == ngx.null then
slice_endpoints = {}
end

for _, endpoint in ipairs(slice_endpoints) do
if endpoint.addresses
and endpoint.conditions
and endpoint.conditions.ready then
local addresses = endpoint.addresses
for _, port in ipairs(endpoint_slice.ports or {}) do
local port_name
if port.name then
port_name = port.name
elseif port.targetPort then
port_name = tostring(port.targetPort)
else
port_name = tostring(port.port)
end

local nodes = port_to_nodes[port_name]
if nodes == nil then
nodes = core.table.new(0, #slice_endpoints * #addresses)
port_to_nodes[port_name] = nodes
end

for _, ip in ipairs(addresses) do
core.table.insert(nodes, {
host = ip,
port = port.port,
weight = handle.default_weight
})
end

end
end
end

local endpoint_key = endpoint_slice.metadata.namespace
.. "/" .. endpoint_slice.metadata.labels["kubernetes.io/service-name"]
update_endpoint_slices_cache(handle, endpoint_key, port_to_nodes, endpoint_slice.metadata.name)

local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
for _, nodes in pairs(cached_endpoints) do
core.table.sort(nodes, sort_nodes_cmp)
end

update_endpoint_dict(handle, cached_endpoints, endpoint_key)
end

local function on_endpoint_slices_deleted(handle, endpoint_slice)
if not endpoint_slice.metadata then
core.log.error("endpoint_slice has no metadata, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end
if not endpoint_slice.metadata.name then
core.log.error("endpoint_slice has no metadata.name, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end
if not endpoint_slice.metadata.namespace then
core.log.error("endpoint_slice has no metadata.namespace, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end
if not endpoint_slice.metadata.labels
or not endpoint_slice.metadata.labels["kubernetes.io/service-name"] then
core.log.error("endpoint_slice has no service-name, endpointSlice: ",
core.json.delay_encode(endpoint_slice))
return
end

if handle.namespace_selector and
not handle:namespace_selector(endpoint_slice.metadata.namespace) then
return
end

core.log.debug(core.json.delay_encode(endpoint_slice))

local endpoint_key = endpoint_slice.metadata.namespace
.. "/" .. endpoint_slice.metadata.labels["kubernetes.io/service-name"]
update_endpoint_slices_cache(handle, endpoint_key, nil, endpoint_slice.metadata.name)

local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key)
for _, nodes in pairs(cached_endpoints) do
core.table.sort(nodes, sort_nodes_cmp)
end

update_endpoint_dict(handle, cached_endpoints, endpoint_key)
end

local function on_endpoint_modified(handle, endpoint)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
Expand Down Expand Up @@ -157,27 +254,14 @@ local function on_endpoint_modified(handle, endpoint)
end
end

for _, ports in pairs(endpoint_buffer) do
for _, nodes in pairs(ports) do
core.table.sort(nodes, sort_nodes_cmp)
end

for _, nodes in pairs(endpoint_buffer) do
core.table.sort(nodes, sort_nodes_cmp)
end

local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
local endpoint_content = core.json.encode(endpoint_buffer, true)
local endpoint_version = ngx.crc32_long(endpoint_content)

local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version)
if err then
core.log.error("set endpoint version into discovery DICT failed, ", err)
return
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
end
local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
update_endpoint_dict(handle, endpoint_buffer, endpoint_key)
end


Expand All @@ -196,6 +280,9 @@ end

local function pre_list(handle)
handle.endpoint_dict:flush_all()
if handle.endpoint_slices_cache then
handle.endpoint_slices_cache = {}
end
end


Expand Down Expand Up @@ -468,11 +555,14 @@ local function single_mode_init(conf)
if conf.watch_endpoint_slices then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
endpoints_informer.on_deleted = on_endpoint_slices_deleted
endpoints_informer.endpoint_slices_cache = {}
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
endpoints_informer.on_deleted = on_endpoint_deleted
end
endpoints_informer.on_deleted = on_endpoint_deleted

endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list

Expand Down Expand Up @@ -574,11 +664,14 @@ local function multiple_mode_init(confs)
if conf.watch_endpoint_slices then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
endpoints_informer.on_deleted = on_endpoint_slices_deleted
endpoints_informer.endpoint_slices_cache = {}
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
endpoints_informer.on_deleted = on_endpoint_deleted
end
endpoints_informer.on_deleted = on_endpoint_deleted

endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list

Expand Down
12 changes: 9 additions & 3 deletions t/kubernetes/configs/endpointslices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ metadata:
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: epslice
name: service-a-epslice1
namespace: ns-a
labels:
"kubernetes.io/service-name": service-a
addressType: IPv4
endpoints: [ ]
---
Expand All @@ -39,8 +41,10 @@ metadata:
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: epslice
name: service-a-epslice1
namespace: ns-b
labels:
"kubernetes.io/service-name": service-a
addressType: IPv4
endpoints: [ ]
---
Expand All @@ -54,8 +58,10 @@ metadata:
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: epslice
name: service-a-epslice1
namespace: ns-c
labels:
"kubernetes.io/service-name": service-a
addressType: IPv4
endpoints: [ ]
---
Loading
Loading