Skip to content

Commit 1c17f39

Browse files
committed
pg commands: fallback to cnpg clusters found via application name
1 parent a3dd7fe commit 1c17f39

File tree

2 files changed

+126
-74
lines changed

2 files changed

+126
-74
lines changed

k

Lines changed: 77 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ module ClickHouse
305305
abort "Error: couldn't find a ClickHouse cluster named '#{cluster_or_application}'" if clusters.empty?
306306

307307
if clusters.length == 1
308-
puts "Defaulted to ClickHouse cluster #{bold(clusters.first)} in application #{bold(cluster_or_application)}."
309-
puts
308+
STDERR.puts gray("Defaulted to ClickHouse cluster #{bold(clusters.first)} in application #{bold(cluster_or_application)}.")
309+
STDERR.puts
310310
cluster = clusters.first
311311
else
312312
puts "Multiple ClickHouse clusters found in application #{cluster_or_application}, please specify one of:"
@@ -723,6 +723,31 @@ def logs_search
723723
end
724724

725725
module Pg
726+
def self.cluster_by_name(cluster_or_application)
727+
cluster = read_kubectl("get cluster.postgresql.cnpg.io #{cluster_or_application} -o name --ignore-not-found").chomp
728+
729+
if cluster.empty?
730+
# Fall back looking for ClickHouse clusters in an application with the same name
731+
clusters = read_kubectl("get cluster.postgresql.cnpg.io -l argocd.argoproj.io/instance=#{cluster_or_application} -o name")
732+
.lines
733+
.map { |c| c.split("/").last.chomp }
734+
735+
abort "Error: couldn't find a Postgres cluster named '#{cluster_or_application}'" if clusters.empty?
736+
737+
if clusters.length == 1
738+
cluster = clusters.first
739+
STDERR.puts gray("Defaulted to Postgres cluster #{bold(cluster)} in application #{bold(cluster_or_application)}.")
740+
STDERR.puts
741+
else
742+
puts "Multiple Postgres clusters found in application #{cluster_or_application}, please specify one of:"
743+
puts clusters
744+
abort
745+
end
746+
end
747+
748+
cluster.split("/").last.chomp
749+
end
750+
726751
def self.secret_for_cluster(cluster_name, user = nil)
727752
unless user.nil? || user == "superuser"
728753
abort "ERROR: CloudNativePG clusters only support default (ie. do not specify) or superuser users"
@@ -767,6 +792,7 @@ module Pg
767792
end
768793

769794
def self.query_on_primary(cluster, query)
795+
cluster = cluster_by_name(cluster)
770796
secret = secret_for_cluster(cluster)
771797

772798
require "base64"
@@ -781,7 +807,6 @@ def pg
781807
puts ""
782808
puts "POSTGRES COMMANDS:"
783809
puts "pg:failover <cluster-name>"
784-
puts "pg:patroni <cluster-name> <patroni-command>"
785810
puts "pg:password <cluster-name> [superuser]"
786811
puts "pg:pods"
787812
puts "pg:primaries"
@@ -803,54 +828,43 @@ def pg_failover
803828
cluster = ARGV.delete_at(0)
804829
abort "Must pass name of cluster, eg. k pg:failover <cluster-name>" unless cluster
805830

806-
if kubectl?("get cluster.postgresql.cnpg.io #{cluster}")
807-
# CloudNativePG
808-
status_yaml = read_kubectl("status -o yaml #{cluster}", plugin: "cnpg")
809-
abort "Error: cluster '#{cluster}' not found" if status_yaml.empty?
810-
811-
# {
812-
# "cluster" => {
813-
# "status" => {
814-
# "instancesReportedState" => {
815-
# "<cluster-name>-1": {
816-
# "isPrimary": true,
817-
# "timeLineID": 1
818-
# },
819-
# "<cluster-name>-2": {
820-
# "isPrimary": false,
821-
# "timeLineID": 1
822-
# }
823-
# }
824-
# ...
825-
# }
826-
# ...
827-
# }
828-
# ...
829-
# }
830-
status = YAML.load(status_yaml)
831-
instance_state = status.dig("cluster", "status", "instancesReportedState")
832-
non_primary_instance = instance_state.find { |_, state| !state["isPrimary"] }
833-
abort "No non-primary instance found for cluster '#{cluster}'" unless non_primary_instance
834-
835-
puts "Promoting #{non_primary_instance.first} to primary..."
836-
kubectl "promote #{cluster} #{non_primary_instance.first}", plugin: "cnpg"
837-
else
838-
abort "Error: cluster '#{cluster}' not found"
839-
end
840-
end
841-
842-
def pg_patroni
843-
cluster = ARGV.delete_at(0)
844-
abort "Must pass name of cluster, eg. k pg:patroni <cluster-name> <command>" unless cluster
845-
846-
patroni_command = ARGV.delete_at(0) || "--help"
847-
848-
Pg.exec_on_primary(cluster, "patronictl #{patroni_command}")
831+
cluster = Pg.cluster_by_name(cluster)
832+
833+
status_yaml = read_kubectl("status -o yaml #{cluster}", plugin: "cnpg")
834+
835+
# {
836+
# "cluster" => {
837+
# "status" => {
838+
# "instancesReportedState" => {
839+
# "<cluster-name>-1": {
840+
# "isPrimary": true,
841+
# "timeLineID": 1
842+
# },
843+
# "<cluster-name>-2": {
844+
# "isPrimary": false,
845+
# "timeLineID": 1
846+
# }
847+
# }
848+
# ...
849+
# }
850+
# ...
851+
# }
852+
# ...
853+
# }
854+
status = YAML.load(status_yaml)
855+
instance_state = status.dig("cluster", "status", "instancesReportedState")
856+
non_primary_instance = instance_state.find { |_, state| !state["isPrimary"] }
857+
abort "No non-primary instance found for cluster '#{cluster}'" unless non_primary_instance
858+
859+
puts "Promoting #{non_primary_instance.first} to primary..."
860+
kubectl "promote #{cluster} #{non_primary_instance.first}", plugin: "cnpg"
849861
end
850862

851863
def pg_password
852864
cluster = ARGV.delete_at(0)
853-
abort "Must pass name of cluster, eg. k pg:password <cluster-name> [<user>]" unless cluster
865+
abort "Must pass name of cluster, eg. k pg:password <cluster-name> [superuser]" unless cluster
866+
867+
cluster = Pg.cluster_by_name(cluster)
854868

855869
user = ARGV.delete_at(0)
856870

@@ -876,7 +890,9 @@ end
876890

877891
def pg_psql
878892
cluster_name = ARGV.delete_at(0)
879-
abort "Must pass name of cluster, eg. k pg:psql <cluster-name> [<user>]" unless cluster_name
893+
abort "Must pass name of cluster, eg. k pg:psql <cluster-name> [superuser]" unless cluster_name
894+
895+
cluster_name = Pg.cluster_by_name(cluster_name)
880896

881897
user = ARGV.delete_at(0)
882898

@@ -895,7 +911,17 @@ def pg_resources
895911
cluster = ARGV.delete_at(0)
896912
abort "Must pass name of cluster, eg. k pg:resources <cluster-name>" unless cluster
897913

898-
kubectl "get all --selector=cnpg.io/cluster=#{cluster}"
914+
cluster = Pg.cluster_by_name(cluster)
915+
916+
puts gray("===") + " " + bold("Pods")
917+
puts
918+
kubectl "get pods --selector=cnpg.io/cluster=#{cluster}"
919+
puts
920+
puts gray("===") + " " + bold("Services")
921+
puts
922+
kubectl "get services --selector=cnpg.io/cluster=#{cluster}"
923+
puts
924+
puts gray("===") + " " + bold("Secrets")
899925
puts
900926
kubectl "get secrets --selector=cnpg.io/cluster=#{cluster}"
901927
end
@@ -904,6 +930,8 @@ def pg_url
904930
cluster_name = ARGV.delete_at(0)
905931
abort "Must pass name of cluster, eg. k pg:url <cluster-name> [<user>]" unless cluster_name
906932

933+
cluster_name = Pg.cluster_by_name(cluster_name)
934+
907935
user = ARGV.delete_at(0)
908936
secret = Pg.secret_for_cluster(cluster_name, user)
909937

k_pg_proxy

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ PROXY_PORT = 10_000
1111
THREADS = 10
1212
CONTEXT = ARGV.first || `kubectl config current-context`.strip
1313

14+
def gray(string)
15+
$stdout.tty? ? "\e[0;90;49m#{string}\e[0m" : string
16+
end
17+
18+
def bold(string)
19+
$stdout.tty? ? "\e[1m#{string}\e[22m" : string
20+
end
21+
1422
def parse_startup_message(client_socket)
1523
length = client_socket.read(4).unpack("L>").first - 4
1624
version = client_socket.read(4).unpack("L>").first
@@ -184,16 +192,48 @@ def forward(from_socket, to_socket)
184192
end
185193
end
186194

195+
# NOTE: this is similar to Pg.cluster_by_name in the k file
196+
def cluster_by_name(cluster_or_application)
197+
# rubocop:disable Layout/LineLength
198+
cluster = `kubectl --context #{CONTEXT} get cluster.postgresql.cnpg.io #{cluster_or_application} -o name --ignore-not-found`.chomp
199+
200+
return cluster.split("/").last.chomp unless cluster.empty?
201+
202+
# Fall back looking for ClickHouse clusters in an application with the same name
203+
clusters = `kubectl --context #{CONTEXT} get cluster.postgresql.cnpg.io -l argocd.argoproj.io/instance=#{cluster_or_application} -o name`
204+
.lines
205+
.map { |c| c.split("/").last.chomp }
206+
# rubocop:enable Layout/LineLength
207+
208+
if clusters.empty?
209+
puts "Error: couldn't find a Postgres cluster named '#{cluster_or_application}'"
210+
elsif clusters.length == 1
211+
cluster = clusters.first
212+
puts gray("Defaulted to Postgres cluster #{bold(cluster)} in application #{bold(cluster_or_application)}.")
213+
cluster
214+
else
215+
$stderr.puts "ERROR: Multiple Postgres clusters found in application #{cluster_or_application}, please specify one of:"
216+
$stderr.puts clusters
217+
end
218+
end
219+
187220
def handle_connection(client_socket, connection_number)
188221
params = parse_startup_message(client_socket)
189222
database = params.fetch("database")
190223

224+
cluster_name = cluster_by_name(database)
225+
226+
unless cluster_name
227+
$stderr.puts "Error: no Postgres cluster found for #{cluster_name}"
228+
client_socket.close
229+
return
230+
end
231+
191232
# Start port forward and connect to Kubernetes Postgres
192-
primary_pod = `kubectl --context #{CONTEXT} get pod -o name -l postgres-operator.crunchydata.com/cluster=#{database},postgres-operator.crunchydata.com/role=master`.chomp # rubocop:disable Layout/LineLength
193-
primary_pod = `kubectl --context #{CONTEXT} get pod -o name -l cnpg.io/cluster==#{database},cnpg.io/instanceRole=primary`.chomp if primary_pod.empty?
233+
primary_pod = `kubectl --context #{CONTEXT} get pod -o name -l cnpg.io/cluster==#{cluster_name},cnpg.io/instanceRole=primary`.chomp # rubocop:disable Layout/LineLength
194234

195235
if primary_pod.empty?
196-
$stderr.puts "Error: no primary postgres pod found for #{database}"
236+
$stderr.puts "Error: no primary postgres pod found for #{cluster_name}"
197237
client_socket.close
198238
return
199239
end
@@ -211,30 +251,14 @@ def handle_connection(client_socket, connection_number)
211251
authentication_ok = ["R", 8, 0].pack("aL>L>")
212252
client_socket.write(authentication_ok)
213253

214-
cluster = `kubectl --context #{CONTEXT} get cluster #{database} -o json`
215-
pgo = false
216-
217-
if cluster.empty?
218-
cluster = `kubectl --context #{CONTEXT} get postgrescluster #{database} -o json`
219-
pgo = true
220-
end
254+
cluster = `kubectl --context #{CONTEXT} get cluster.postgresql.cnpg.io #{cluster_name} -o json`
221255

222256
abort "Error: cluster '#{database}' not found" if cluster.empty?
223257
cluster = JSON.parse(cluster)
224258

225-
if pgo
226-
user ||= cluster.dig("spec", "users", 0, "name")
227-
unless user
228-
puts "No users found in PostgresCluster spec, using default user '#{database}'"
229-
user = database
230-
end
231-
secret_suffix = "pguser-#{user}"
232-
else
233-
user = database
234-
secret_suffix = "app"
235-
end
259+
secret_suffix = "app"
236260

237-
secret = JSON.parse(`kubectl --context #{CONTEXT} get secret #{database}-#{secret_suffix} -o json`).fetch("data")
261+
secret = JSON.parse(`kubectl --context #{CONTEXT} get secret #{cluster_name}-#{secret_suffix} -o json`).fetch("data")
238262

239263
send_startup_message(
240264
pg_socket,
@@ -247,7 +271,7 @@ def handle_connection(client_socket, connection_number)
247271
client_socket.close
248272
pg_socket.close
249273
Process.kill("QUIT", port_forward_pid)
250-
puts "Error: Failed to connect to #{database}"
274+
puts "Error: Failed to connect to #{cluster_name}"
251275
return
252276
end
253277

@@ -258,12 +282,12 @@ def handle_connection(client_socket, connection_number)
258282
if client_forward.alive? && pg_forward.alive?
259283
sleep 1
260284
else
261-
puts "Disconnecting from #{database}"
285+
puts "Disconnecting from #{cluster_name}"
262286
return
263287
end
264288
end
265289
rescue Errno::ECONNRESET
266-
puts "handle_connection: Errno::ECONNRESET handling database #{database}"
290+
puts "handle_connection: Errno::ECONNRESET handling database #{cluster_name}"
267291
ensure
268292
Process.kill("QUIT", port_forward_pid) rescue nil # rubocop:disable Style/RescueModifier
269293
client_socket&.close

0 commit comments

Comments
 (0)