From a1504c763f026f320622b6b6d32b2a39c63212e4 Mon Sep 17 00:00:00 2001
From: Robert Gildein <robert.gildein@canonical.com>
Date: Tue, 29 Oct 2024 14:13:17 +0100
Subject: [PATCH] Add log forwarding Pebble layer to sparkd (#114)

Signed-off-by: Robert Gildein <robert.gildein@canonical.com>
---
 images/charmed-spark/bin/sparkd.sh         | 26 +++++++-
 images/charmed-spark/pebble/log-layer.yaml | 13 ++++
 images/charmed-spark/rockcraft.yaml        |  2 +
 tests/integration/integration-tests.sh     | 70 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 1 deletion(-)
 create mode 100644 images/charmed-spark/pebble/log-layer.yaml

diff --git a/images/charmed-spark/bin/sparkd.sh b/images/charmed-spark/bin/sparkd.sh
index 57214b73..d143a517 100644
--- a/images/charmed-spark/bin/sparkd.sh
+++ b/images/charmed-spark/bin/sparkd.sh
@@ -1,5 +1,29 @@
 #!/bin/bash
 
+function get_log_layer {
+  local loki_url=$1
+  local log_layer_file=${2-"/opt/pebble/log-layer.yaml"}
+  sed -e "s/\$LOKI_URL/$loki_url/g" \
+      -e "s/\$FLAVOUR/$FLAVOUR/g" \
+      -e "s/\$SPARK_APPLICATION_ID/$SPARK_APPLICATION_ID/g" \
+      -e "s/\$SPARK_USER/$SPARK_USER/g" \
+      -e "s/\$HOSTNAME/$HOSTNAME/g" \
+      $log_layer_file
+}
+
+function log_forwarding {
+  # We need to escape special characters from URL to be able to use with template.
+  local loki_url="$(<<< "$LOKI_URL" sed -e 's`[][\\/.*^$]`\\&`g')"
+  if [ ! -z "$loki_url" ]; then
+      echo "Log-forwarding to Loki is enabled."
+      local rendered_log_layer=$(get_log_layer $loki_url)
+      echo "$rendered_log_layer" | tee /tmp/rendered_log_layer.yaml
+      pebble add logging /tmp/rendered_log_layer.yaml
+  else
+      echo "Log-forwarding to Loki is disabled."
+  fi
+}
+
 function finish {
   if [ $? -ne 0 ]
   then
@@ -9,13 +33,13 @@ function finish {
 }
 trap finish EXIT
 
-
 FLAVOUR=$1
 
 echo "Running script with ${FLAVOUR} flavour"
 
 case "${FLAVOUR}" in
   driver|executor)
+    log_forwarding
     pushd /opt/spark
     ./entrypoint.sh "$@"
     ;;
diff --git a/images/charmed-spark/pebble/log-layer.yaml b/images/charmed-spark/pebble/log-layer.yaml
new file mode 100644
index 00000000..83f2eb52
--- /dev/null
+++ b/images/charmed-spark/pebble/log-layer.yaml
@@ -0,0 +1,13 @@
+log-targets:
+  grafana-agent-k8s:
+    override: replace
+    type: loki
+    location: $LOKI_URL
+    services: [all]
+    labels:
+      product: charmed-spark
+      role: $FLAVOUR
+      app: spark
+      spark_job_id: $SPARK_APPLICATION_ID
+      user: $SPARK_USER
+      pod: $HOSTNAME
diff --git a/images/charmed-spark/rockcraft.yaml b/images/charmed-spark/rockcraft.yaml
index fa2d2089..7588d666 100644
--- a/images/charmed-spark/rockcraft.yaml
+++ b/images/charmed-spark/rockcraft.yaml
@@ -182,6 +182,7 @@ parts:
       bin/spark-client.service-account-registry: opt/spark-client/python/bin/spark-client.service-account-registry
       bin/spark-client.spark-shell: opt/spark-client/python/bin/spark-client.spark-shell
       bin/spark-client.spark-submit: opt/spark-client/python/bin/spark-client.spark-submit
+      pebble/log-layer.yaml: opt/pebble/log-layer.yaml
     stage:
       - etc/spark8t/conf/
       - etc/spark/conf/
@@ -192,6 +193,7 @@ parts:
       - opt/spark-client/python/bin/spark-client.service-account-registry
       - opt/spark-client/python/bin/spark-client.spark-shell
       - opt/spark-client/python/bin/spark-client.spark-submit
+      - opt/pebble/log-layer.yaml
 
   user-setup:
     plugin: nil
diff --git a/tests/integration/integration-tests.sh b/tests/integration/integration-tests.sh
index 0ef55517..10aa6f9d 100755
--- a/tests/integration/integration-tests.sh
+++ b/tests/integration/integration-tests.sh
@@ -53,6 +53,20 @@ validate_metrics() {
   fi
 }
 
+validate_logs() {
+  log=$1
+  if [ $(grep -Ri "Log-forwarding to Loki is enabled." $log | wc -l) -lt 3 ]; then
+    echo "ERROR: Could not validate logs."
+    echo "DEBUG: Log file:\n$(cat $log)"
+    exit 1
+  fi
+  if [ $(grep -Ri 'Layer \\\\"logging\\\\" added successfully from \\\\"/tmp/rendered_log_layer.yaml\\\\"' $log | wc -l) -lt 3 ]; then
+    echo "ERROR: Could not validate logs."
+    echo "DEBUG: Log file:\n$(cat $log)"
+    exit 1
+  fi
+}
+
 setup_user() {
   echo "setup_user() ${1} ${2}"
 
@@ -422,6 +436,56 @@ run_example_job_in_pod_with_metrics() {
 }
 
 
+run_example_job_in_pod_with_log_forwarding() {
+  NAMESPACE=${1-$NAMESPACE}
+  USERNAME=${2-spark}
+  SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar"
+
+  PREVIOUS_JOB=$(kubectl -n $NAMESPACE get pods --sort-by=.metadata.creationTimestamp | grep driver | tail -n 1 | cut -d' ' -f1)
+  # start simple http server
+  LOG_FILE="/tmp/server-loki.log"
+  SERVER_PORT=9091
+  python3 tests/integration/resources/test_web_server.py $SERVER_PORT > $LOG_FILE &
+  HTTP_SERVER_PID=$!
+  # get ip address
+  IP_ADDRESS=$(hostname -I | cut -d ' ' -f 1)
+  echo "IP: $IP_ADDRESS"
+
+  kubectl -n $NAMESPACE exec testpod -- env PORT="$SERVER_PORT" IP="$IP_ADDRESS" UU="$USERNAME" NN="$NAMESPACE" JJ="$SPARK_EXAMPLES_JAR_NAME" IM="$(spark_image)" \
+                  /bin/bash -c 'spark-client.spark-submit \
+                  --username $UU --namespace $NN \
+                  --conf spark.kubernetes.driver.request.cores=100m \
+                  --conf spark.kubernetes.executor.request.cores=100m \
+                  --conf spark.kubernetes.container.image=$IM \
+                  --conf spark.executorEnv.LOKI_URL="http://$IP:$PORT" \
+                  --conf spark.kubernetes.driverEnv.LOKI_URL="http://$IP:$PORT" \
+                  --class org.apache.spark.examples.SparkPi \
+                  local:///opt/spark/examples/jars/$JJ 1000'
+
+  # kubectl --kubeconfig=${KUBE_CONFIG} get pods
+  DRIVER_PODS=$(kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver )
+  DRIVER_JOB=$(kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1)
+
+  if [[ "${DRIVER_JOB}" == "${PREVIOUS_JOB}" ]]
+  then
+    echo "ERROR: Sample job has not run!"
+    exit 1
+  fi
+
+  # Check job output
+  # Sample output
+  # "Pi is roughly 3.13956232343"
+  pi=$(kubectl logs $(kubectl get pods --sort-by=.metadata.creationTimestamp -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1)  -n ${NAMESPACE} | grep 'Pi is roughly' | rev | cut -d' ' -f1 | rev | cut -c 1-3)
+  echo -e "Spark Pi Job Output: \n ${pi}"
+
+  validate_pi_value $pi
+  validate_logs $LOG_FILE
+
+  # kill http server
+  kill $HTTP_SERVER_PID
+}
+
+
 run_example_job_with_error_in_pod() {
   SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar"
 
@@ -657,6 +721,12 @@ echo -e "########################################"
 
 (setup_user_context && test_example_job_in_pod_with_metrics && cleanup_user_success) || cleanup_user_failure_in_pod
 
+echo -e "########################################"
+echo -e "RUN EXAMPLE JOB WITH LOG FORWARDING"
+echo -e "########################################"
+
+(setup_user_context && run_example_job_in_pod_with_log_forwarding && cleanup_user_success) || cleanup_user_failure_in_pod
+
 echo -e "########################################"
 echo -e "RUN EXAMPLE JOB WITH ERRORS"
 echo -e "########################################"