diff --git a/tensorflow_data_validation/utils/stats_gen_lib.py b/tensorflow_data_validation/utils/stats_gen_lib.py
index af69c254..999a1329 100644
--- a/tensorflow_data_validation/utils/stats_gen_lib.py
+++ b/tensorflow_data_validation/utils/stats_gen_lib.py
@@ -91,6 +91,17 @@ def generate_statistics_from_tfrecord(
   batch_size = stats_options.desired_batch_size
   # PyLint doesn't understand Beam PTransforms.
   # pylint: disable=no-value-for-parameter
+
+  def _convert_dict(legacy_examples: Dict) -> Dict:
+    result = {}
+    
+    for k, v in legacy_examples.items():
+        if np.issubdtype(v.dtype, np.number):
+            result[k] = v
+        else:
+            result[k] = v.astype(str)
+    return result
+
   with beam.Pipeline(options=pipeline_options) as p:
     # Auto detect tfrecord file compression format based on input data
     # path suffix.
@@ -101,6 +112,7 @@ def generate_statistics_from_tfrecord(
             schema=None,
             telemetry_descriptors=['tfdv', 'generate_statistics_from_tfrecord'])
                          .BeamSource(batch_size))
+        | 'format' >> beam.Map(_convert_dict)
         | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
         | 'WriteStatsOutput' >>
         (stats_api.WriteStatisticsToTFRecord(output_path)))
@@ -163,6 +175,18 @@ def generate_statistics_from_csv(
       constants.DEFAULT_DESIRED_INPUT_BATCH_SIZE)
   # PyLint doesn't understand Beam PTransforms.
   # pylint: disable=no-value-for-parameter
+
+  def _convert_dict(legacy_examples: Dict) -> Dict:
+    result = {}
+    
+    for k, v in legacy_examples.items():
+        if np.issubdtype(v.dtype, np.number):
+            result[k] = v
+        else:
+            result[k] = v.astype(str)
+    return result
+
+
   with beam.Pipeline(options=pipeline_options) as p:
     # If a header is not provided, assume the first line in a file
     # to be the header.
@@ -175,6 +199,7 @@ def generate_statistics_from_csv(
             file_pattern=data_location,
             skip_header_lines=skip_header_lines,
             compression_type=compression_type)
+        | 'format' >> beam.Map(_convert_dict)
         | 'DecodeData' >> csv_decoder.DecodeCSV(
             column_names=column_names,
             delimiter=delimiter,
@@ -229,6 +254,11 @@ def generate_statistics_from_dataframe(
         dataframe, stats_options, stats_generators)
   else:
     # TODO(b/144580609): Consider using Beam for inmemory mode as well.
+    
+    for col in dataframe.columns:
+        if ~ np.issubdtype(dataframe[col].dtype, np.number):
+            dataframe[col] = dataframe[col].astype(str)
+            
     splits = np.array_split(dataframe, n_jobs)
     partial_stats = Parallel(n_jobs=n_jobs)(
         delayed(_generate_partial_statistics_from_df)(
@@ -267,6 +297,10 @@ def _generate_partial_statistics_from_df(
           type=schema_pb2.INT,
           bool_domain=schema_pb2.BoolDomain())
   dataframe = dataframe.drop(columns=drop_columns)
+  for col in dataframe.columns:
+        if ~ np.issubdtype(dataframe[col].dtype, np.number):
+            dataframe[col] = dataframe[col].astype(str)
+            
   if schema.feature:
     stats_options_modified.schema = schema
   record_batch_with_primitive_arrays = table_util.DataFrameToRecordBatch(