diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js
index c6410547..c937961f 100644
--- a/lib/kafkajs/_consumer.js
+++ b/lib/kafkajs/_consumer.js
@@ -104,6 +104,11 @@ class Consumer {
    */
   #state = ConsumerState.INIT;
 
+  /**
+   * @returns {ConsumerState} the client state.
+   */
+  getState() { return this.#state }
+
   /**
    * Contains a mapping of topic+partition to an offset that the user wants to seek to.
    * The keys are of the type "<topic>|<partition>".
@@ -414,8 +419,8 @@ class Consumer {
         try {
           alternateAssignment = await userSpecifiedRebalanceCb(err, assignment, assignmentFns);
         } catch (e) {
-          this.#logger.error(`Error from user's rebalance callback: ${e.stack}, `+
-                             'continuing with the default rebalance behavior.');
+          this.#logger.error(`Error from user's rebalance callback: ${e.stack}, ` +
+            'continuing with the default rebalance behavior.');
         }
 
         if (alternateAssignment) {
@@ -639,11 +644,10 @@ class Consumer {
     /* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
      * TODO: add trampoline method for offset commit callback. */
     rdKafkaConfig['offset_commit_cb'] = true;
-    rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e =>
-      {
-        if (this.#logger)
-          this.#logger.error(`Error from rebalance callback: ${e.stack}`);
-      });
+    rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e => {
+      if (this.#logger)
+        this.#logger.error(`Error from rebalance callback: ${e.stack}`);
+    });
 
     /* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this
      * setting and set it to false. */
@@ -660,16 +664,16 @@ class Consumer {
       this.#autoCommit = rdKafkaConfig['enable.auto.commit'];
     }
 
-     /**
-     * Actual max poll interval is twice the configured max poll interval,
-     * because we want to ensure that when we ask for worker termination,
-     * and there is one last message to be processed, we can process it in
-     * the configured max poll interval time.
-     * This will cause the rebalance callback timeout to be double
-     * the value of the configured max poll interval.
-     * But it's expected otherwise we cannot have a cache and need to consider
-     * max poll interval reached on processing the very first message.
-     */
+    /**
+    * Actual max poll interval is twice the configured max poll interval,
+    * because we want to ensure that when we ask for worker termination,
+    * and there is one last message to be processed, we can process it in
+    * the configured max poll interval time.
+    * This will cause the rebalance callback timeout to be double
+    * the value of the configured max poll interval.
+    * But it's expected otherwise we cannot have a cache and need to consider
+    * max poll interval reached on processing the very first message.
+    */
     this.#maxPollIntervalMs = rdKafkaConfig['max.poll.interval.ms'] ?? 300000;
     this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs;
     rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2;
@@ -833,14 +837,14 @@ class Consumer {
    * only if the size has been requested to be increased twice in a row.
    * @private
    */
-   #increaseMaxSize() {
-     if (this.#messageCacheMaxSize === 1024)
-         return;
-     this.#increaseCount++;
-     if (this.#increaseCount <= 1)
-         return;
-     this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024);
-     this.#increaseCount = 0;
+  #increaseMaxSize() {
+    if (this.#messageCacheMaxSize === 1024)
+      return;
+    this.#increaseCount++;
+    if (this.#increaseCount <= 1)
+      return;
+    this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024);
+    this.#increaseCount = 0;
   }
 
   /**
@@ -850,8 +854,8 @@ class Consumer {
    * @private
    */
   #decreaseMaxSize(recvdSize) {
-      this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
-      this.#increaseCount = 0;
+    this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
+    this.#increaseCount = 0;
   }
 
   /**
@@ -1008,7 +1012,7 @@ class Consumer {
     }
 
     return this.#fetchAndResolveWith(() =>
-        this.#messageCache.nextN(null, size),
+      this.#messageCache.nextN(null, size),
       this.#messageCacheMaxSize);
   }
 
@@ -1404,7 +1408,7 @@ class Consumer {
       });
 
       if (this.#pendingOperations.length) {
-        ppc  = this.#discardMessages(ms, ppc);
+        ppc = this.#discardMessages(ms, ppc);
         break;
       }
 
@@ -1423,7 +1427,7 @@ class Consumer {
   async #checkMaxPollIntervalNotExceeded(now) {
     const maxPollExpiration = this.#lastFetchClockNs +
       BigInt((this.#cacheExpirationTimeoutMs + this.#maxPollIntervalMs)
-              * 1e6);
+        * 1e6);
 
     let interval = Number(maxPollExpiration - now) / 1e6;
     if (interval < 1)
@@ -1717,7 +1721,7 @@ class Consumer {
     await Promise.allSettled(librdkafkaSeekPromises);
     await Promise.all(librdkafkaSeekPromises);
 
-    for (const [key, ] of seekedPartitions) {
+    for (const [key,] of seekedPartitions) {
       this.#pendingSeeks.delete(key);
     }
 
@@ -1804,7 +1808,7 @@ class Consumer {
     this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset]);
 
     this.#addPendingOperation(() =>
-        this.#seekInternal([rdKafkaTopicPartitionOffset]));
+      this.#seekInternal([rdKafkaTopicPartitionOffset]));
   }
 
   async describeGroup() {
@@ -1895,7 +1899,7 @@ class Consumer {
         topicPartition));
 
     this.#addPendingOperation(() =>
-        this.#pauseInternal(flattenedToppars));
+      this.#pauseInternal(flattenedToppars));
 
     /* Note: we don't use flattenedToppars here because resume flattens them again. */
     return () => this.resume(toppars);
@@ -2034,4 +2038,4 @@ class Consumer {
   }
 }
 
-module.exports = { Consumer, PartitionAssigners: Object.freeze(PartitionAssigners), };
+module.exports = { Consumer, PartitionAssigners: Object.freeze(PartitionAssigners), ConsumerState, };
diff --git a/lib/kafkajs/_kafka.js b/lib/kafkajs/_kafka.js
index e671d94a..525c0469 100644
--- a/lib/kafkajs/_kafka.js
+++ b/lib/kafkajs/_kafka.js
@@ -1,5 +1,5 @@
-const { Producer, CompressionTypes } = require('./_producer');
-const { Consumer, PartitionAssigners } = require('./_consumer');
+const { Producer, CompressionTypes, ProducerState } = require('./_producer');
+const { Consumer, PartitionAssigners, ConsumerState } = require('./_consumer');
 const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
 const error = require('./_error');
 const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
@@ -118,7 +118,10 @@ module.exports = {
   ...error, logLevel,
   PartitionAssigners,
   PartitionAssignors: PartitionAssigners,
+  ConsumerState,
+  ProducerState,
   CompressionTypes,
   ConsumerGroupStates,
   AclOperationTypes,
-  IsolationLevel};
+  IsolationLevel
+};
diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js
index bf1e058d..b1a2e0d3 100644
--- a/lib/kafkajs/_producer.js
+++ b/lib/kafkajs/_producer.js
@@ -96,6 +96,11 @@ class Producer {
    */
   #state = ProducerState.INIT;
 
+  /**
+   * @returns {ProducerState} the client state.
+   */
+  getState() { return this.#state }
+
   /**
    * ongoingTransaction is true if there is an ongoing transaction.
    * @type {boolean}
@@ -845,4 +850,4 @@ class Producer {
   }
 }
 
-module.exports = { Producer, CompressionTypes: Object.freeze(CompressionTypes) };
+module.exports = { Producer, CompressionTypes: Object.freeze(CompressionTypes), ProducerState };
diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts
index 58d03a5b..6f157753 100644
--- a/types/kafkajs.d.ts
+++ b/types/kafkajs.d.ts
@@ -186,6 +186,16 @@ export type PartitionMetadata = {
 
 export type Transaction = Producer;
 
+export enum ProducerState {
+  INIT = 0,
+  CONNECTING = 1,
+  INITIALIZING_TRANSACTIONS = 2,
+  INITIALIZED_TRANSACTIONS = 3,
+  CONNECTED = 4,
+  DISCONNECTING = 5,
+  DISCONNECTED = 6,
+}
+
 export type Producer = Client & {
   send(record: ProducerRecord): Promise<RecordMetadata[]>
   sendBatch(batch: ProducerBatch): Promise<RecordMetadata[]>
@@ -197,6 +207,7 @@ export type Producer = Client & {
   abort(): Promise<void>
   sendOffsets(args: { consumer: Consumer, topics: TopicOffsets[] }): Promise<void>
   isActive(): boolean
+  getState(): ProducerState
 }
 
 export enum PartitionAssigners {
@@ -352,6 +363,14 @@ export type ITopicMetadata = {
   authorizedOperations?: AclOperationTypes[]
 }
 
+export enum ConsumerState {
+  INIT = 0,
+  CONNECTING = 1,
+  CONNECTED = 2,
+  DISCONNECTING = 3,
+  DISCONNECTED = 4,
+}
+
 export type Consumer = Client & {
   subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
   stop(): Promise<void>
@@ -364,6 +383,7 @@ export type Consumer = Client & {
   paused(): TopicPartitions[]
   resume(topics: Array<{ topic: string; partitions?: number[] }>): void
   assignment(): TopicPartition[]
+  getState(): ConsumerState
 }
 
 export interface AdminConfig {
@@ -414,8 +434,9 @@ export type Admin = {
     groupId: string,
     topics?: TopicInput,
     timeout?: number,
-    requireStableOffsets?: boolean }):
-    Promise<Array<{topic: string; partitions:FetchOffsetsPartition[]}>>
+    requireStableOffsets?: boolean
+  }):
+    Promise<Array<{ topic: string; partitions: FetchOffsetsPartition[] }>>
   deleteTopicRecords(options: {
     topic: string; partitions: SeekEntry[];
     timeout?: number; operationTimeout?: number