diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index fc466efa5a179..70faa919b1e4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -69,6 +69,8 @@
 import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders;
 import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
 import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -92,6 +94,7 @@
 import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
 import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
@@ -330,6 +333,20 @@ public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
         return sendRequest(detailsHeaders, params);
     }
 
+    /**
+     * Requests the job exception history.
+     *
+     * @param jobID The job id
+     * @return Job exceptions
+     */
+    public CompletableFuture<JobExceptionsInfoWithHistory> getJobExceptions(JobID jobID) {
+        final JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
+        final JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
+        params.jobPathParameter.resolve(jobID);
+
+        return sendRequest(jobExceptionsHeaders, params);
+    }
+
     @Override
     public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
         final CheckedSupplier<CompletableFuture<JobStatus>> operation =
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 27e1835b26308..ee7ac1ed7df53 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -73,6 +73,8 @@
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
 import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
@@ -92,6 +94,7 @@
 import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
@@ -1298,6 +1301,35 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception {
         }
     }
 
+    @Test
+    void testGetJobExceptionsInfoWithHistory() throws Exception {
+
+        final TestJobExceptionsHandler jobExceptionsHandler = new TestJobExceptionsHandler();
+        TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(jobExceptionsHandler);
+        final RestClusterClient<?> restClusterClient =
+                createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+        try {
+            CompletableFuture<JobExceptionsInfoWithHistory> future =
+                    restClusterClient.getJobExceptions(jobId);
+            JobExceptionsInfoWithHistory result = future.get();
+            assertThat(result.getExceptionHistory()).isNotNull();
+            assertThat(result.getExceptionHistory().getEntries()).hasSize(1);
+            JobExceptionsInfoWithHistory.RootExceptionInfo rootExceptionInfo =
+                    result.getExceptionHistory().getEntries().get(0);
+            assertThat(rootExceptionInfo.getExceptionName()).isEqualTo("TestRootException");
+            assertThat(rootExceptionInfo.getStacktrace()).contains("Simulated failure");
+            assertThat(rootExceptionInfo.getConcurrentExceptions()).hasSize(1);
+            JobExceptionsInfoWithHistory.ExceptionInfo concurrent =
+                    rootExceptionInfo.getConcurrentExceptions().iterator().next();
+            assertThat(concurrent.getExceptionName()).isEqualTo("TestException");
+            assertThat(concurrent.getStacktrace()).contains("Simulated failure");
+            assertThat(result.getExceptionHistory().isTruncated()).isFalse();
+        } finally {
+            restClusterClient.close();
+            restServerEndpoint.close();
+        }
+    }
+
     private class TestClientCoordinationHandler
             extends TestHandler<
                     ClientCoordinationRequestBody,
@@ -1464,6 +1496,55 @@ protected CompletableFuture<JobStatusInfo> handleRequest(
         }
     }
 
+    private class TestJobExceptionsHandler
+            extends TestHandler<
+                    EmptyRequestBody,
+                    JobExceptionsInfoWithHistory,
+                    JobExceptionsMessageParameters> {
+
+        private TestJobExceptionsHandler() {
+            super(JobExceptionsHeaders.getInstance());
+        }
+
+        @Override
+        protected CompletableFuture<JobExceptionsInfoWithHistory> handleRequest(
+                @Nonnull HandlerRequest<EmptyRequestBody> request,
+                @Nonnull DispatcherGateway gateway)
+                throws RestHandlerException {
+            JobExceptionsInfoWithHistory.ExceptionInfo exceptionInfo =
+                    new JobExceptionsInfoWithHistory.ExceptionInfo(
+                            "TestException",
+                            "java.lang.RuntimeException: Simulated failure\n\tat"
+                                    + " org.test.FakeClass.method(FakeClass.java:42)",
+                            12345L);
+
+            Collection<JobExceptionsInfoWithHistory.ExceptionInfo> concurrentExceptions =
+                    Collections.singletonList(exceptionInfo);
+
+            Map<String, String> failureLabels = new HashMap<>();
+            failureLabels.put("flink.operator.failure.label", "simulated");
+
+            JobExceptionsInfoWithHistory.RootExceptionInfo rootExceptionInfo =
+                    new JobExceptionsInfoWithHistory.RootExceptionInfo(
+                            "TestRootException",
+                            "java.lang.Exception: Test stack trace\n\tat"
+                                    + " org.test.FakeClass.method(FakeClass.java:123)",
+                            12345L,
+                            failureLabels,
+                            concurrentExceptions);
+
+            JobExceptionsInfoWithHistory.JobExceptionHistory exceptionHistory =
+                    new JobExceptionsInfoWithHistory.JobExceptionHistory(
+                            Collections.singletonList(rootExceptionInfo), false // Truncated?
+                            );
+
+            JobExceptionsInfoWithHistory jobExceptionsInfoWithHistory =
+                    new JobExceptionsInfoWithHistory(exceptionHistory);
+
+            return CompletableFuture.completedFuture(jobExceptionsInfoWithHistory);
+        }
+    }
+
     private class TestJobDetailsInfoHandler
             extends TestHandler<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {