diff --git a/examples/test.py b/examples/test.py new file mode 100644 index 000000000..0007b8dd2 --- /dev/null +++ b/examples/test.py @@ -0,0 +1,65 @@ +import os +import deepspeed +import torch +import wandb +import sys +import numpy as np +import datetime +import json +from rouge_score import rouge_scorer +from multiprocessing import Pool +from functools import partial +from transformers import AutoConfig + +# from lmflow.pipeline.auto_pipeline import AutoPipeline + +import torch.distributed as dist +from transformers import HfArgumentParser +from lmflow.datasets.dataset import Dataset +from lmflow.pipeline.base_pipeline import BasePipeline +from lmflow.utils.data_utils import set_random_seed, batchlize, answer_extraction, load_data +from lmflow.models.auto_model import AutoModel +from lmflow.args import ModelArguments, DatasetArguments, AutoArguments +from lmflow.pipeline.test_rougel import Test_rougel + +os.environ["TOKENIZERS_PARALLELISM"] = "false" # To avoid warnings about parallelism in tokenizers + + +def get_pipeline( + pipeline_name, + model_args, + data_args, + pipeline_args, + *args, + **kwargs + ): + + pipeline = Test_rougel( + model_args, + data_args, + pipeline_args, + *args, + **kwargs + ) + return pipeline + +# copied from evaluate.py, with small changes. +pipeline_name = "test_rougel" +PipelineArguments = AutoArguments.get_pipeline_args_class(pipeline_name) + +parser = HfArgumentParser((ModelArguments, DatasetArguments, PipelineArguments)) +model_args, data_args, pipeline_args = parser.parse_args_into_dataclasses() + +with open (pipeline_args.deepspeed, "r") as f: + ds_config = json.load(f) + +model = AutoModel.get_model(model_args, tune_strategy='none', ds_config=ds_config) +dataset = Dataset(data_args) + +evaluator = get_pipeline( + pipeline_name=pipeline_name, + model_args=model_args, + data_args=data_args, + pipeline_args=pipeline_args, +) +evaluator.evaluate(model=model, dataset=dataset, metric=pipeline_args.metric) diff --git a/examples/test_rougel.py b/examples/test_rougel.py new file mode 100644 index 000000000..2f56d5746 --- /dev/null +++ b/examples/test_rougel.py @@ -0,0 +1,306 @@ +import os +import deepspeed +import torch +import wandb +import sys +import numpy as np +import datetime +import json +from rouge_score import rouge_scorer +from multiprocessing import Pool +from functools import partial +# TODO: remove later +from transformers import AutoConfig +# from lmflow.pipeline.auto_pipeline import AutoPipeline +import evaluate +import torch.distributed as dist +from transformers import HfArgumentParser +from lmflow.datasets.dataset import Dataset +from lmflow.pipeline.base_pipeline import BasePipeline +from lmflow.utils.data_utils import set_random_seed, batchlize, answer_extraction, load_data +from lmflow.models.auto_model import AutoModel +from lmflow.args import ModelArguments, DatasetArguments, AutoArguments + +os.environ["TOKENIZERS_PARALLELISM"] = "false" # To avoid warnings about parallelism in tokenizers + +# copied from evaluate.py, with small changes. +pipeline_name = "test_rougel" +PipelineArguments = AutoArguments.get_pipeline_args_class(pipeline_name) + +parser = HfArgumentParser((ModelArguments, DatasetArguments, PipelineArguments)) +model_args, data_args, pipeline_args = parser.parse_args_into_dataclasses() + +with open (pipeline_args.deepspeed, "r") as f: + ds_config = json.load(f) + +model = AutoModel.get_model(model_args, tune_strategy='none', ds_config=ds_config) +dataset = Dataset(data_args) + +evaluator = AutoPipeline.get_pipeline( + pipeline_name=pipeline_name, + model_args=model_args, + data_args=data_args, + pipeline_args=pipeline_args, +) +evaluator.evaluate(model=model, dataset=dataset, metric=pipeline_args.metric) + +#copied from evaluator.py +class Test_rougel(BasePipeline): + """ + Initializes the `Evaluator` class with given arguments. + + Parameters + ------------ + model_args : ModelArguments object. + Contains the arguments required to load the model. + + data_args : DatasetArguments object. + Contains the arguments required to load the dataset. + + evaluator_args : EvaluatorArguments object. + Contains the arguments required to perform evaluation. + + + """ + + def __init__(self, model_args, data_args, evaluator_args): + # our method + self.data_args = data_args + self.evaluator_args = evaluator_args + self.model_args = model_args + print("--------Begin Evaluator Arguments----------") + print(f"model_args : {self.model_args}") + print(f"data_args : {self.data_args}") + print(f"evaluator_args : {self.evaluator_args}") + print("--------End Evaluator Arguments----------") + # logger + if (self.evaluator_args.use_wandb == True): + wandb.init(project="lmflow_evaluation") + # random seed + set_random_seed(self.evaluator_args.random_seed) + self.local_rank = int(os.getenv("LOCAL_RANK", "0")) + self.world_size = int(os.getenv("WORLD_SIZE", "1")) + print("\nself.world_size是:", self.world_size, "\n") + torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error + deepspeed.init_distributed() + + self.config = AutoConfig.from_pretrained(model_args.model_name_or_path) + try: + self.model_hidden_size = self.config.hidden_size + except: + print("Error in setting hidden size, use the default size 1024") + self.model_hidden_size = 1024 # gpt2 seems do not have hidden_size in config + + print(f"model_hidden_size = {self.model_hidden_size}") + # batch size has to be divisible by world_size, but can be bigger than world_size + train_batch_size = 1 * self.world_size + self.evaluator_args.minibatch_size = train_batch_size + self.block_size = evaluator_args.evaluate_block_size + # dataloader, data_size = create_dataloader(args) # load dataset + + # First use the method in self-instruct to get the ROUGE-L scores for the dataset, then use the method in LMFlow and compare the two scores, + # The metric is tested to be valid if all scores are the same. + def get_rougel_score_list(self, predicted_data: str): + scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=False) + + dataset = load_data(predicted_data) + data_dict = dataset.to_dict() + inputs = [instance["input"] for instance in data_dict["instances"]] + outputs = [instance["output"] for instance in data_dict["instances"]] + dataset_size = len(outputs) + + dataset_buf = [] + for idx in range(dataset_size): + dataset_buf.append({ + "input": inputs[idx], + "output": outputs[idx], + "input_idx": idx + }) + + dataloader = batchlize( # 相当于每minibatch_size大小切一段,dataloader = [[{}, {}, ... ], [{}, {}, ... ], ... ] + dataset_buf, + self.evaluator_args.minibatch_size, # = self.world_size + self.evaluator_args.random_shuffle + ) + print(f"Successfully create dataloader with size {len(dataloader)}.") + + score_list = [] # store the maximum ROUGE-L score in each batch + + for batch in dataloader: + input_ = [data["input"] for data in batch] + output_ = [data["output"] for data in batch] + with Pool(4) as p: # 4 processes + rouge_scores = p.map(partial(scorer.score, input_), [output_]) + rouge_scores = [score["rougeL"].fmeasure for score in rouge_scores] # score["rougeL"].fmeasure 是对应的pair的得分 + max_rl_score = max(rouge_scores) + score_list.append(max_rl_score) + + return score_list + + + def create_dataloader(self, dataset: Dataset): + data_dict = dataset.to_dict() + inputs = [instance["input"] for instance in data_dict["instances"]] + outputs = [instance["output"] for instance in data_dict["instances"]] + dataset_size = len(outputs) + dataset_buf = [] + for idx in range(dataset_size): + dataset_buf.append({ + "input": inputs[idx], + "output": outputs[idx], + "input_idx": idx + }) + + dataloader = batchlize( + dataset_buf, + self.evaluator_args.minibatch_size, # = self.world_size + self.evaluator_args.random_shuffle + ) + print(f"Successfully create dataloader with size {len(dataloader)}.") + return dataloader, dataset_size + + # TODO: Split for better unittest + def _calculate_rouge_l(self, predicted_answer, groundtruth, scorer: rouge_scorer.RougeScorer, answer_type=None): + case_insensitive_types = [ + "strategyqa", + "coin_flip", + "pubmedqa", + "binary_choice", + "medmcqa", + "usmle", + ] + if answer_type in case_insensitive_types: + rouge_score = scorer.score(groundtruth.lower(), predicted_answer.lower())["rougeL"].fmeasure + else: + rouge_score = scorer.score(groundtruth, predicted_answer)["rougeL"].fmeasure + return rouge_score + + def evaluate(self, model, dataset: Dataset, metric="rougel"): + """ + Perform Evaluation for a model + + Parameters + ------------ + model : TunableModel object. + TunableModel to perform inference + + dataset : Dataset object. + + + """ + if metric in ["rl", "rouge-l", "ROUGE-L"]: + dataloader, data_size = self.create_dataloader(dataset) # data_size = number of mini-batches + + if not dist.is_initialized() or dist.get_rank() == 0: + if not os.path.exists(self.evaluator_args.output_dir): + os.makedirs(self.evaluator_args.output_dir) + output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") + + pred_score_list = [] # list to record the ROUGE-L scores of all batches from LMFlow method + + # ds_engine = deepspeed.initialize(model=model.get_model(), config_params=self.ds_config)[0] + # ds_engine.module.eval() + for batch_index, batch in enumerate(dataloader): + if batch_index * self.world_size >= self.data_args.max_eval_samples: + break + if self.local_rank >= len(batch): + current_batch = batch[0] + else: + # the batch in current process + current_batch = batch[self.local_rank] + + prompt_structure = self.evaluator_args.prompt_structure + input = prompt_structure.format(input=current_batch['input']) + output = current_batch['output'] + input_idx = current_batch['input_idx'] + + inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) + + # with torch.no_grad(): + # outputs = ds_engine.module.generate(inputs, synced_gpus=True, pad_token_id=model.get_tokenizer().eos_token_id, min_length=5, max_length=100,temperature=0.0, do_sample=False) + outputs = model.inference(inputs, max_new_tokens=100, temperature=0.0) + text_out = model.decode(outputs[0], skip_special_tokens=True) + + # # only return the generation, truncating the input + prompt_length = len(model.decode(inputs[0], skip_special_tokens=True, )) + text_out = text_out[prompt_length:] + answer_type = self.evaluator_args.answer_type + pred_answer = answer_extraction( + text_out, + answer_type=answer_type, + ) + print( + f"batch_index{batch_index} rank{self.local_rank}:\n question={input}\n prediction={text_out}\n") + print(f"predicted answer: {pred_answer} \n") + print(f"groundtruth answer: {output} \n") + + + scorer = rouge_scorer.RougeScorer(["rougeL"], + use_stemmer=False) # stemmer: stem the words to their root form + + if self.local_rank >= len( + batch): # for last batch, the padding examples are ignored and do not contribute to the ROUGE-L + rl_ = 0 + total_ = 0 + else: + rl_ = max(0, self._calculate_rouge_l(pred_answer, output, scorer, answer_type)) + total_ = 1 + score = rl_ + + # collect rouge-l from all gpus + all_process = torch.tensor([rl_, total_], dtype=torch.float32, device=self.local_rank) + dist.all_reduce(all_process, dist.ReduceOp.MAX, async_op=False) + max_, total_ = all_process.tolist() + print("max_: ", max_) + print("total_: ", total_) + # avg = max_ / total_ + avg = max_ + pred_score_list.append(avg) + # total += total_ + + # collect predictions from all gpus + output_dict = {"question": input, + "prediction": text_out, + "pred_answer": pred_answer, + "answer": output} + all_process_list = [{}] * self.world_size + + dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, + dst=0) + print("all_process_list: ", all_process_list) + if not dist.is_initialized() or dist.get_rank() == 0: + current_rouge_l = np.mean(pred_score_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "{}/ {} has been finished, current ROUGE-L = {}".format(int(pred_score_list), data_size, + current_rouge_l)) + + if (self.evaluator_args.use_wandb == True): + wandb.log({"rougeL_fmeasure": current_rouge_l}) + + for index, output in enumerate(all_process_list): + output_json = json.dumps(output) + output_writer.write(output_json + '\n') + + if not dist.is_initialized() or dist.get_rank() == 0: + current_rouge_l = np.mean(pred_score_list) + print("Final ROUGE-L = ", current_rouge_l) + output_writer.close() + + else: + raise NotImplementedError(f"{metric} is not implemented or not match with our defined metrics") + + # load the dataset with predicted answers and apply the self-instruct method to get the answer score list. + ans_score_list = self.get_rougel_score_list(f"{self.evaluator_args.output_dir}/evaluation.json") + + # Start compare the two ROUGE-L scores lists we get + matched = True + for pred, ans in zip(pred_score_list, ans_score_list): + print("LMFlow ROUGE-L: ", pred, " -- self-instruct ROUGE-L: ", ans) + if pred != ans: + matched = False + print("scores not matched!") + return + print("scores matched. Tested to be valid.") + + + diff --git a/scripts/rougel_test_case.sh b/scripts/rougel_test_case.sh new file mode 100644 index 000000000..7d38696ac --- /dev/null +++ b/scripts/rougel_test_case.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# 比较self-instruct和LMFlow对相同instructions的ROUGE-L得分是否相同 +# A test case for the ROUGE-L metric used in evaluation. +# Compares the ROUGE-L scores from self-intruct and LMFlow. Valid if all scores in each pair are the same. + +CUDA_VISIBLE_DEVICES=0 \ + deepspeed examples/test.py \ + --answer_type text \ + --model_name_or_path gpt2-large \ + --dataset_path data/alpaca/test \ + --deepspeed examples/ds_config.json \ + --inference_batch_size_per_device 1 \ + --metric rouge-l diff --git a/scripts/run_evaluation_with_rougel.sh b/scripts/run_evaluation_with_rougel.sh new file mode 100644 index 000000000..90f5f25d7 --- /dev/null +++ b/scripts/run_evaluation_with_rougel.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# --model_name_or_path specifies the original huggingface model +# --lora_model_path specifies the model difference introduced by finetuning, +# i.e. the one saved by ./scripts/run_finetune_with_lora.sh +CUDA_VISIBLE_DEVICES=0 \ + deepspeed examples/evaluate.py \ + --answer_type text \ + --model_name_or_path gpt2-large \ + --dataset_path data/alpaca/test \ + --deepspeed examples/ds_config.json \ + --inference_batch_size_per_device 1 \ + --metric rouge-l diff --git a/scripts/run_tmp.sh b/scripts/run_tmp.sh new file mode 100644 index 000000000..f666b7ad4 --- /dev/null +++ b/scripts/run_tmp.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +CUDA_VISIBLE_DEVICES=0 \ + deepspeed examples/evaluate.py \ + --answer_type text \ + --model_name_or_path gpt2-large \ + --dataset_path data/alpaca/test \ + --deepspeed examples/ds_config.json \ + --metric accuracy \ No newline at end of file diff --git a/src/lmflow/args.py b/src/lmflow/args.py index 00ec3ffa2..76c0429c5 100644 --- a/src/lmflow/args.py +++ b/src/lmflow/args.py @@ -13,7 +13,7 @@ """ from dataclasses import dataclass, field -from typing import Optional, List +from typing import Optional from transformers.utils.versions import require_version @@ -99,10 +99,6 @@ class ModelArguments: default=None, metadata={"help": "If training from scratch, pass a model type from the list: " + ", ".join(MODEL_TYPES)}, ) - arch_type: Optional[str] = field( - default="decoder_only", - metadata={"help": "The architecture type of the model. Currently supported decoder_only or encoder_decoder"} - ) config_overrides: Optional[str] = field( default=None, metadata={ @@ -171,10 +167,6 @@ class ModelArguments: default=32, metadata={"help": "Merging ratio between the fine-tuned model and the original. This is controlled by a parameter called alpha in the paper."}, ) - lora_target_modules: List[str] = field( - default=None, metadata={"help": "Pretrained config name or path if not the same as model_name", - } - ) lora_dropout: float = field( default=0.1, metadata={"help": "The dropout rate in lora.linear."}, @@ -250,9 +242,6 @@ class DatasetArguments: dataset_path: Optional[str] = field( default=None, metadata={"help": "The path of the dataset to use."} ) - eval_dataset_path: Optional[str] = field( - default=None, metadata={"help": "The path of the eval dataset to use."} - ) dataset_name: Optional[str] = field( default="customized", metadata={"help": "Should be \"customized\""} ) @@ -313,16 +302,6 @@ class DatasetArguments: default=None, metadata={"help": "The number of processes to use for the preprocessing."}, ) - group_texts_batch_size: int = field( - default=1000, - metadata={ - "help": ( - "Number of samples that will be grouped together to go though" - " `group_texts` operation. See `--disable_group_texts` for" - " detailed explanation of this operation." - ) - } - ) disable_group_texts: bool = field( default=False, metadata={ @@ -372,7 +351,7 @@ class FinetunerArguments(TrainingArguments): class EvaluatorArguments: """ Define a class EvaluatorArguments using the dataclass decorator. The class contains several optional - parameters that can be used to configure a evaluator. + parameters that can be used to configure an evaluator. local_rank : str For distributed training: local_rank @@ -491,7 +470,7 @@ class EvaluatorArguments: default="accuracy", metadata={ "help": "the metric the model will be evaluated on", - "choices": ["ppl", "perplexity", "acc", "accuracy", "nll", "neg_log_likelihood"], + "choices": ["ppl", "perplexity", "acc", "accuracy", "nll", "neg_log_likelihood", "rl", "rouge-l", "ROUGE-L"], }, ) inference_batch_size_per_device: Optional[int] = field( @@ -499,12 +478,13 @@ class EvaluatorArguments: metadata={ "help": ( "every device will infer {inference_batch_size_per_device}" - " samples in parallel. The inferred results will be concatenaed" + " samples in parallel. The inferred results will be concatenated" " with inputs and attach a reward." ), }, ) + @dataclass class InferencerArguments: """ @@ -621,27 +601,147 @@ class RaftAlignerArguments(TrainingArguments): metadata={ "help": ( "every device will infer {inference_batch_size_per_device}" - " samples in parallel. The inferred results will be concatenaed" + " samples in parallel. The inferred results will be concatenated" " with inputs and attach a reward." ), }, ) @dataclass -class BenchmarkingArguments: - dataset_name: Optional[str] = field( +class TesterArguments: + """ + Define a class TesterArguments using the dataclass decorator. The class contains several optional + parameters that can be used to configure an tester for ROUGE-L. + + local_rank : str + For distributed training: local_rank + + random_shuffle : bool + + use_wandb : bool + + random_seed : int, default = 1 + + output_dir : str, default = './output_dir', + + mixed_precision : str, choice from ["bf16","fp16"]. + mixed precision mode, whether to use bf16 or fp16 + + deepspeed : + Enable deepspeed and pass the path to deepspeed json config file (e.g. ds_config.json) or an already + loaded json file as a dict + """ + local_rank: int = field( + default=-1, + metadata={"help": "For distributed training: local_rank" + } + ) + + random_shuffle: Optional[bool] = field( + default=False, + metadata={"help": "" + } + ) + + use_wandb: Optional[bool] = field( + default=False, + metadata={ + "help": ( + "When this flag is True, wandb will be enabled" + ) + }, + ) + random_seed: Optional[int] = field( + default=1, + metadata={ + "help": ( + "used to set random seed" + ) + }, + ) + output_dir: Optional[str] = field( + default="./output_dir", + metadata={"help": "Output path for the inferenced results"}, + ) + mixed_precision: Optional[str] = field( + default="bf16", + metadata={ + "help": ( + "mixed precision mode, whether to use bf16 or fp16" + ), + "choices": ["bf16", "fp16"], + }, + ) + deepspeed: Optional[str] = field( default=None, metadata={ - "help": "benchmark dataset name provided by lmflow" + "help": ( + "Enable deepspeed and pass the path to deepspeed json config file (e.g. ds_config.json) or an already" + " loaded json file as a dict" + ) }, ) - lm_evaluation_metric: Optional[str] = field( - default="accuracy", + answer_type: Optional[str] = field( + default="text", + metadata={ + "help": ( + 'Question type for answer extraction from the decoder output.' + ' Supported types: \n' + ' 1) "multiple_choice", e.g. A, B, C, D, ...\n' + ' 2) "binary_choice", e.g. yes, no, maybe\n' + ' 3) "math", e.g. 1.0, -3.52\n' + ' 4) "text", e.g. "I think that it is okay"\n' + ' 5) Special treatment for several datasets\n' + ' - "gsm8k"\n' + ' - "svamp"\n' + ' - "asdiv"\n' + ' - "addsub"\n' + ' - "singleeq"\n' + ' - "multiarith"\n' + ' - "aqua"\n' + ' - "csqa"\n' + ' - "strategyqa"\n' + ' - "pubmedqa"\n' + ' - "medmcqa"\n' + ' - "usmle"\n' + ) + }, + ) + prompt_structure: Optional[str] = field( + default="{input}", + metadata={ + "help": ( + 'Prompt structure to facilitate prompt engineering during' + ' inference. The model will receive' + ' `prompt_structure.format(input=input)` as its input.' + ) + }, + ) + evaluate_block_size: Optional[int] = field( + default=512, + metadata={ + "help": ( + "the model will have at least block_size tokens for context when calculating the conditional likelihood of any one token" + " (provided there are block_size preceding tokens available to condition on)" + ) + }, + ) + metric: Optional[str] = field( + default="rougel", metadata={ "help": "the metric the model will be evaluated on", - "choices": ["acc", "acc_norm", "bleu", "chrf", "em", "f1", "ppl", \ - "ter", "r@1", "r@2", "mrr", "mc1", "mc2", "word_perplexity", \ - "byte_perplexity", "bits_per_byte"], + "choices": ["ppl", "perplexity", "acc", "accuracy", "nll", "neg_log_likelihood", "rl", "rouge-l", + "ROUGE-L"], + }, + ) + inference_batch_size_per_device: Optional[int] = field( + default=1, + metadata={ + "help": ( + "every device will infer {inference_batch_size_per_device}" + " samples in parallel. The inferred results will be concatenated" + " with inputs and attach a reward." + ), }, ) @@ -650,6 +750,7 @@ class BenchmarkingArguments: "evaluator": EvaluatorArguments, "inferencer": InferencerArguments, "raft_aligner": RaftAlignerArguments, + "test_rougel": TesterArguments, } diff --git a/src/lmflow/pipeline/auto_pipeline.py b/src/lmflow/pipeline/auto_pipeline.py index 0a1c3fcdb..5067600b5 100644 --- a/src/lmflow/pipeline/auto_pipeline.py +++ b/src/lmflow/pipeline/auto_pipeline.py @@ -7,6 +7,7 @@ from lmflow.pipeline.finetuner import Finetuner from lmflow.pipeline.inferencer import Inferencer from lmflow.pipeline.raft_aligner import RaftAligner +from lmflow.pipeline.test_rougel import Test_rougel PIPELINE_MAPPING = { @@ -14,6 +15,7 @@ "finetuner": Finetuner, "inferencer": Inferencer, "raft_aligner": RaftAligner, + "test_rougel": Test_rougel, } diff --git a/src/lmflow/pipeline/evaluator.py b/src/lmflow/pipeline/evaluator.py index 217355b38..fb574755a 100644 --- a/src/lmflow/pipeline/evaluator.py +++ b/src/lmflow/pipeline/evaluator.py @@ -1,15 +1,17 @@ -"""The Evaluator class simplifies the process of running evaluation on a language model provided by a HFDecoderModel instance imported from the lmflow package. The class constructor takes three dictionaries as arguments: model_args containing arguments related to the language model, data_args containing arguments related to the data used for evaluation, and evaluator_args containing other arguments for the evaluation process. +"""The Evaluator class simplifies the process of running evaluation on a language model provided by a HFDecoderModel instance imported from the LMFlow-main package. The class constructor takes three dictionaries as arguments: model_args containing arguments related to the language model, data_args containing arguments related to the data used for evaluation, and evaluator_args containing other arguments for the evaluation process. The class has two methods: create_dataloader() that loads the data from the test file, creates a data loader, and returns it with the size of the data, and evaluate(model) that generates output text given input text. It uses the create_dataloader() method to load the data, iterates over the data in mini-batches, and encodes the input text with the encode() method of the HFDecoderModel class. Then, it generates output text using the evaluate() method of the HFDecoderModel class, decodes the generated output text using the decode() method of the HFDecoderModel class, and writes the output to a file in the output directory. The method also logs some information to the console and Weights and Biases if the use_wandb argument is True. """ import os +import deepspeed import torch import wandb -import deepspeed import sys import numpy as np import datetime import json +from rouge_score import rouge_scorer +from multiprocessing import Pool # TODO: remove later from transformers import AutoConfig import torch.distributed as dist @@ -42,7 +44,11 @@ def __init__(self, model_args, data_args, evaluator_args): self.data_args = data_args self.evaluator_args = evaluator_args self.model_args = model_args - + print("--------Begin Evaluator Arguments----------") + print(f"model_args : {self.model_args}") + print(f"data_args : {self.data_args}") + print(f"evaluator_args : {self.evaluator_args}") + print("--------End Evaluator Arguments----------") # logger if(self.evaluator_args.use_wandb == True): wandb.init(project="lmflow_evaluation") @@ -50,6 +56,7 @@ def __init__(self, model_args, data_args, evaluator_args): set_random_seed(self.evaluator_args.random_seed) self.local_rank = int(os.getenv("LOCAL_RANK", "0")) self.world_size = int(os.getenv("WORLD_SIZE", "1")) + print("self.world_size是:", self.world_size) torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error deepspeed.init_distributed() @@ -62,7 +69,7 @@ def __init__(self, model_args, data_args, evaluator_args): print(f"model_hidden_size = {self.model_hidden_size}") # batch size has to be divisible by world_size, but can be bigger than world_size - train_batch_size = self.evaluator_args.inference_batch_size_per_device * self.world_size + train_batch_size = 1 * self.world_size self.evaluator_args.minibatch_size = train_batch_size self.block_size = evaluator_args.evaluate_block_size # dataloader, data_size = create_dataloader(args) # load dataset @@ -73,7 +80,7 @@ def create_dataloader(self, dataset: Dataset): inputs = [ instance["input"] for instance in data_dict["instances"] ] outputs = [ instance["output"] for instance in data_dict["instances"] ] dataset_size = len(outputs) - dataset_buf = [] + dataset_buf = [] # [{input: in1, output:out1, input_idx: 1}, { ... } , ... ] for idx in range(dataset_size): dataset_buf.append({ "input": inputs[idx], @@ -81,17 +88,31 @@ def create_dataloader(self, dataset: Dataset): "input_idx": idx }) - dataloader = batchlize( + dataloader = batchlize( # 相当于每minibatch_size大小切一段,dataloader = [[{}, {}, ... ], [{}, {}, ... ], ... ] dataset_buf, - self.evaluator_args.minibatch_size, + self.evaluator_args.minibatch_size, # = self.world_size self.evaluator_args.random_shuffle ) - print(f"Successfully create dataloader with size {len(dataloader)},batch_size {self.evaluator_args.minibatch_size}.") - + print(f"Successfully create dataloader with size {len(dataloader)}.") return dataloader, dataset_size # TODO: Split for better unittest + def _calculate_rouge_l(self, predicted_answer, groundtruth, scorer: rouge_scorer.RougeScorer, answer_type=None): + case_insensitive_types = [ + "strategyqa", + "coin_flip", + "pubmedqa", + "binary_choice", + "medmcqa", + "usmle", + ] + if answer_type in case_insensitive_types: + rouge_score = scorer.score(groundtruth.lower(), predicted_answer.lower())["rougeL"].fmeasure + else: + rouge_score = scorer.score(groundtruth, predicted_answer)["rougeL"].fmeasure + return rouge_score + def _match(self, predicted_answer, groundtruth, answer_type=None): case_insensitive_types = [ @@ -109,13 +130,7 @@ def _match(self, predicted_answer, groundtruth, answer_type=None): return False - def evaluate( - self, - model, - dataset: Dataset, - metric = "accuracy", - verbose=True, - ): + def evaluate(self, model, dataset: Dataset, metric = "accuracy"): """ Perform Evaluation for a model @@ -128,126 +143,150 @@ def evaluate( """ - if metric in ["acc", "accuracy"]: - acc = self._evaluate_acc(model, dataset, verbose=verbose) - print(f"Evaluating final accuracy: {acc}") - return acc - elif metric in ["ppl", "perplexity"]: - ppl = self._evaluate_ppl(model, dataset, verbose=verbose) - print(f"Evaluating final perplexity: {ppl}") - return ppl - elif metric in ["nll", "neg_log_likelihood"]: - nll = self._evaluate_nll(model, dataset, verbose=verbose) - print(f"Evaluating final negative log likelihood: {nll}") - return nll - else: - raise NotImplementedError(f"metric {metric} is not supported") - - - def _evaluate_acc(self, model, dataset, verbose=True): - dataloader, data_size = self.create_dataloader(dataset) - - if not dist.is_initialized() or dist.get_rank() == 0: - if not os.path.exists(self.evaluator_args.output_dir): - os.makedirs(self.evaluator_args.output_dir) - output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") + if metric in ["acc", "accuracy", "rl", "rouge-l", "ROUGE-L"]: + dataloader, data_size = self.create_dataloader(dataset) # data_size = number of mini-batches - acc_list = [] - total = 0 - for batch_index, batch in enumerate(dataloader): - if batch_index * self.world_size >= self.data_args.max_eval_samples: - break - if batch_index * self.world_size >= self.data_args.max_eval_samples: - break - if self.local_rank*self.evaluator_args.inference_batch_size_per_device >= len(batch): - current_batch = batch[:self.evaluator_args.inference_batch_size_per_device] - else: - current_batch = batch[self.local_rank*self.evaluator_args.inference_batch_size_per_device:(self.local_rank+1)*self.evaluator_args.inference_batch_size_per_device] - prompt_structure = self.evaluator_args.prompt_structure - input = [prompt_structure.format(input=i['input']) for i in current_batch] - output = [i['output'] for i in current_batch] - input_idx = [i['input_idx'] for i in current_batch] - batch_input = model.encode(input, return_tensors="pt",padding=True).to(device=self.local_rank) - inputs = batch_input['input_ids'] - mask = batch_input['attention_mask'] - outputs = model.inference(inputs, max_new_tokens=100,attention_mask=mask,temperature=0.0) - text_out = model.decode(outputs, skip_special_tokens=True) - # # only return the generation, trucating the input - decoded_input = model.decode(inputs, skip_special_tokens=True,) - prompt_length = [len(i) for i in decoded_input] - text_out = [text_out[i][prompt_length[i]:] for i in range(len(text_out))] - answer_type = self.evaluator_args.answer_type - pred_answer = [] - for i in text_out: - pred_answer.append(answer_extraction( - i, + if not dist.is_initialized() or dist.get_rank() == 0: + if not os.path.exists(self.evaluator_args.output_dir): + os.makedirs(self.evaluator_args.output_dir) + output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") + + all_list = [] # list to replace both acc_list and rl_list + total = 0 + # ds_engine = deepspeed.initialize(model=model.get_model(), config_params=self.ds_config)[0] + # ds_engine.module.eval() + for batch_index, batch in enumerate(dataloader): + if batch_index * self.world_size >= self.data_args.max_eval_samples: + break + if self.local_rank >= len(batch): + current_batch = batch[0] + else: + # the batch in current process + current_batch = batch[self.local_rank] + + prompt_structure = self.evaluator_args.prompt_structure + input = prompt_structure.format(input=current_batch['input']) + output = current_batch['output'] + input_idx = current_batch['input_idx'] + + inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) + + + # with torch.no_grad(): + # outputs = ds_engine.module.generate(inputs, synced_gpus=True, pad_token_id=model.get_tokenizer().eos_token_id, min_length=5, max_length=100,temperature=0.0, do_sample=False) + outputs = model.inference(inputs, max_new_tokens=100, temperature=0.0) + text_out = model.decode(outputs[0], skip_special_tokens=True) + + # # only return the generation, truncating the input + prompt_length = len(model.decode(inputs[0], skip_special_tokens=True,)) + text_out = text_out[prompt_length:] + answer_type = self.evaluator_args.answer_type + pred_answer = answer_extraction( + text_out, answer_type=answer_type, - )) - if verbose: + ) print(f"batch_index{batch_index} rank{self.local_rank}:\n question={input}\n prediction={text_out}\n") print(f"predicted answer: {pred_answer} \n") print(f"groundtruth answer: {output} \n") - if self.local_rank >= len(batch): # for last batch, the padding examples are ignored and donot contribute to the accuracy - correct_ = 0 - total_ = 1 - total -= 1 - else: - correct_ = 0 - total_ = 0 - for i in range(len(pred_answer)): - total_ += 1 - if self._match(pred_answer[i], output[i], answer_type): - correct_ += 1 - - # collect accuracy from all gpus - all_process = torch.tensor([correct_, total_], dtype=torch.float32, device=self.local_rank) - dist.all_reduce(all_process, dist.ReduceOp.SUM, async_op=False) - correct_, total_ = all_process.tolist() - avg = correct_ / total_ - acc_list.append(avg) - total += total_ + if metric in ["acc", "accuracy"]: + if self.local_rank >= len(batch): # for last batch, the padding examples are ignored and do not contribute to the accuracy + correct_ = 0 + total_ = 0 + else: + correct_ = 0 + total_ = 1 + if self._match(pred_answer, output, answer_type): + correct_ = 1 + score = correct_ + + else: + scorer = rouge_scorer.RougeScorer(["rougeL"], + use_stemmer=False) # stemmer: stem the words to their root form + + if self.local_rank >= len( + batch): # for last batch, the padding examples are ignored and do not contribute to the ROUGE-L + rl_ = 0 + total_ = 0 + else: + rl_ = max(0, self._calculate_rouge_l(pred_answer, output, scorer, answer_type)) + total_ = 1 + score = rl_ + + # collect rouge-l from all gpus + all_process = torch.tensor([rl_, total_], dtype=torch.float32, device=self.local_rank) + if metric in ["acc", "accuracy"]: + dist.all_reduce(all_process, dist.ReduceOp.SUM, async_op=False) + else: + dist.all_reduce(all_process, dist.ReduceOp.MAX, async_op=False) # sum 还是 max好? + sum_or_max, total_ = all_process.tolist() + if metric in ["acc", "accuracy"]: + avg = sum_or_max / total_ + else: + avg = sum_or_max + all_list.append(avg) + total += total_ + + # collect predictions from all gpus + output_dict = {"question": input, + "prediction": text_out, + "pred_answer": pred_answer, + "answer": output} + all_process_list = [{}] * self.world_size + + dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, dst=0) # 只收集process 0?? + if not dist.is_initialized() or dist.get_rank() == 0: + if metric in ["acc", "accuracy"]: + current_accuracy = np.mean(all_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "{}/ {} has been finished, current accuracy = {}".format(int(total), data_size, current_accuracy)) + + if(self.evaluator_args.use_wandb == True): + wandb.log({"Accuracy": current_accuracy}) - # collect predictions from all gpus - output_dict = {"question": input, - "prediction": text_out, - "pred_answer": pred_answer, - "answer": output} - all_process_list = [{}] * self.world_size + else: + current_rouge_l = np.mean(all_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "{}/ {} has been finished, current ROUGE-L = {}".format(int(total), data_size, current_rouge_l)) - dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, dst=0) - if not dist.is_initialized() or dist.get_rank() == 0: - current_accuracy = np.mean(acc_list) - print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "{}/ {} has been finished, current accuracy = {}".format(int(total), data_size, current_accuracy)) + if (self.evaluator_args.use_wandb == True): + wandb.log({"rougeL_fmeasure": current_rouge_l}) - if(self.evaluator_args.use_wandb == True): - wandb.log({"Accuracy": current_accuracy}) + for index, output in enumerate(all_process_list): + output_json = json.dumps(output) + output_writer.write(output_json + '\n') - for index, output in enumerate(all_process_list): - output_json = json.dumps(output) - output_writer.write(output_json + '\n') - if not dist.is_initialized() or dist.get_rank() == 0: - current_accuracy = np.mean(acc_list) - print("Final accuracy = ", current_accuracy) - output_writer.close() - return current_accuracy + if not dist.is_initialized() or dist.get_rank() == 0: # 此刻已经处理完dataset + if metric in ["acc", "accuracy"]: + current_accuracy = np.mean(all_list) + print("Final accuracy = ", current_accuracy) + else: + current_rouge_l = np.mean(all_list) + print("Final ROUGE-L = ", current_rouge_l) + output_writer.close() + elif metric in ["ppl", "perplexity"]: + ppl = self._evaluate_ppl(model, dataset) + print(f"Evaluating final ppl: {ppl}") + elif metric in ["nll", "neg_log_likelihood"]: + neg_log_likelihood = self._evaluate_neg_log_likelihood(model, dataset) + print(f"Evaluating final negative log likelihood: {neg_log_likelihood}") + else: + raise NotImplementedError(f"{metric} is not implemented or not match with our defined metrics") - def _evaluate_ppl(self, model, dataset: Dataset, verbose=True): + + def _evaluate_ppl(self, model, dataset: Dataset): data_dict = dataset.to_dict() if data_dict['type'] == 'text2text': raise NotImplementedError("ppl evaluation is currently not supported for text2text dataset, please use text_only dataset.") texts = [ instance["text"] for instance in data_dict["instances"] ] - encodings = model.get_tokenizer()("\n\n".join(texts), return_tensors="pt") + encodings = model.get_tokenizer()("\n\n".join(texts), return_tensors="pt") # seems no need for rouge-L # Define some constant try: max_length = min(model.get_backend_model().config.n_positions, model.get_max_length()) except: max_length = min(1024, model.get_max_length()) - if verbose: - print(f"The maximum sequence length : {max_length}") + print(f"The maximum sequence length : {max_length}") seq_len = encodings.input_ids.size(1) nlls = [] @@ -268,20 +307,99 @@ def _evaluate_ppl(self, model, dataset: Dataset, verbose=True): nlls.append(neg_log_likelihood) prev_end_loc = end_loc - if verbose: - print(f"Evaluating PPL: {int(begin_loc/self.block_size) + 1} / {int(seq_len/self.block_size)} Complete, current ppl : {torch.exp(torch.stack(nlls).mean())}") + print(f"Evaluating PPL: {int(begin_loc/self.block_size) + 1} / {int(seq_len/self.block_size)} Complete, current ppl : {torch.exp(torch.stack(nlls).mean())}") if end_loc == seq_len: break ppl = torch.exp(torch.stack(nlls).mean()) return ppl + # Added for ROUGE-L evaluation + def _evaluate_rouge_l(self, model, dataset: Dataset): # alpaca dataset: [{instruction: "...", input: "...", output: "...", text: "..."}, ....] + dataloader, data_size = self.create_dataloader(dataset) - def _evaluate_nll( - self, - model, - dataset: Dataset, - verbose=True, - ): + if not dist.is_initialized() or dist.get_rank() == 0: + if not os.path.exists(self.evaluator_args.output_dir): + os.makedirs(self.evaluator_args.output_dir) + output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") + + rl_list = [] + total = 0 + for batch_index, batch in enumerate(dataloader): + if batch_index * self.world_size >= self.data_args.max_eval_samples: + break + if self.local_rank >= len(batch): + current_batch = batch[0] + else: + # the batch in current process + current_batch = batch[self.local_rank] + + prompt_structure = self.evaluator_args.prompt_structure + input = prompt_structure.format(input=current_batch['input']) + output = current_batch['output'] + input_idx = current_batch['input_idx'] + + inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) + outputs = model.inference(inputs, max_new_tokens=100, temperature=0.0) + text_out = model.decode(outputs[0], skip_special_tokens=True) + + # only return the generation, truncating the input + prompt_length = len(model.decode(inputs[0], skip_special_tokens=True, )) + text_out = text_out[prompt_length:] + answer_type = self.evaluator_args.answer_type + pred_answer = answer_extraction( + text_out, + answer_type=answer_type, + ) + print(f"batch_index{batch_index} rank{self.local_rank}:\n question={input}\n prediction={text_out}\n") + print(f"predicted answer: {pred_answer} \n") + print(f"groundtruth answer: {output} \n") + + scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=False) # stemmer: stem the words to their root form + + if self.local_rank >= len( + batch): # for last batch, the padding examples are ignored and do not contribute to the ROUGE-L + rl_ = 0 + total_ = 0 + else: + rl_ = max(0, self._calculate_rouge_l(pred_answer, output, scorer, answer_type)) + total_ = 1 + + # collect rouge-l from all gpus + all_process = torch.tensor([rl_, total_], dtype=torch.float32, device=self.local_rank) + dist.all_reduce(all_process, dist.ReduceOp.MAX, async_op=False) # sum 还是 max好? + rl_sum, total_ = all_process.tolist() + avg = rl_sum / total_ + rl_list.append(avg) + total += total_ + + # collect predictions from all gpus + output_dict = {"question": input, + "prediction": text_out, + "pred_answer": pred_answer, + "answer": output} + all_process_list = [{}] * self.world_size + + dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, dst=0) # 只收集process 0?? + if not dist.is_initialized() or dist.get_rank() == 0: + current_rouge_l = np.mean(rl_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "{}/ {} has been finished, current ROUGE-L = {}".format(int(total), data_size, current_rouge_l)) + + if (self.evaluator_args.use_wandb == True): + wandb.log({"rougeL_fmeasure": current_rouge_l}) + + for index, output in enumerate(all_process_list): + output_json = json.dumps(output) + output_writer.write(output_json + '\n') + + if not dist.is_initialized() or dist.get_rank() == 0: # 此刻已经处理完dataset + current_rouge_l = np.mean(rl_list) + print("Final ROUGE-L = ", current_rouge_l) + output_writer.close() + return current_rouge_l + + + def _evaluate_neg_log_likelihood(self, model, dataset: Dataset): """ Evaluates negative log likelihood of the model over a dataset. @@ -296,29 +414,14 @@ def _evaluate_nll( A float which represents the negative log likelihood. """ data_dict = dataset.to_dict() - - # Handles prompt structure - if dataset.get_type() == "text2text": - prompt = self.evaluator_args.prompt_structure - data_dict["instances"] = [ - { - "input": prompt.format(input=instance["input"]), - "output": instance["output"] - } - for instance in data_dict["instances"] - ] - - dataset = dataset.from_dict(data_dict) - tokenized_dataset = model.tokenize(dataset, add_special_tokens=False) - tokenized_dataset = tokenized_dataset.get_backend_dataset() - encoding_list = [ - { - "input_ids": torch.tensor([input_ids]), - "labels": torch.tensor([labels]), - } - for input_ids, labels in zip(tokenized_dataset["input_ids"], - tokenized_dataset["labels"]) - ] + if data_dict['type'] == 'text2text': + raise NotImplementedError( + "negative log likelihood evaluation is currently not supported" + " for text2text dataset, please use text_only dataset." + ) + texts = [ instance["text"] for instance in data_dict["instances"] ] + encoding_list = [ model.get_tokenizer()(text, return_tensors="pt") + for text in texts ] # Gets context window length try: @@ -328,10 +431,9 @@ def _evaluate_nll( max_length = min(1024, model.get_max_length()) nlls = [] - full_nlls = [] - num_samples = len(encoding_list) + num_samples = len(texts) for sample_idx, encodings in enumerate(encoding_list): - seq_len = encodings["input_ids"].size(1) + seq_len = encodings.input_ids.size(1) prev_end_loc = 0 for begin_loc in range(0, seq_len, self.block_size): @@ -339,68 +441,28 @@ def _evaluate_nll( # may be different from block_size on last loop trg_len = end_loc - prev_end_loc - input_ids = encodings["input_ids"][:, begin_loc:end_loc] + input_ids = encodings.input_ids[:, begin_loc:end_loc] input_ids = input_ids.to(device=self.local_rank) - labels = encodings["labels"][:, begin_loc:end_loc] - target_ids = labels.clone() - full_target_ids = input_ids.clone() - - def get_nll(label_ids, nll_list): - label_ids[:, :-trg_len] = -100 - label_ids = label_ids.to(device=self.local_rank) - - # Valid labels are from 0 to `vocab_size` - num_valid_labels = torch.count_nonzero(label_ids >= 0) - if label_ids[0, 0] != -100: - num_valid_labels -= 1 - - if not torch.all(label_ids == -100): - with torch.no_grad(): - outputs = model.get_backend_model()( - input_ids, labels=label_ids - ) - # loss is calculated using CrossEntropyLoss which - # sums over valid labels N.B. the model only - # calculates loss over trg_len - 1 labels, because - # it internally shifts the labels to the left by 1. - neg_log_likelihood = outputs.loss * num_valid_labels - else: - neg_log_likelihood = torch.zeros([]).to( - device=self.local_rank - ) - - nll_list.append(neg_log_likelihood) + target_ids = input_ids.clone() + target_ids[:, :-trg_len] = -100 - get_nll(target_ids, nlls) - get_nll(full_target_ids, full_nlls) - - current_output_nll = torch.stack(nlls).sum() / (sample_idx + 1) - current_full_nll = torch.stack(full_nlls).sum() / (sample_idx + 1) + with torch.no_grad(): + outputs = model.get_backend_model()(input_ids, + labels=target_ids) + # loss is calculated using CrossEntropyLoss which averages + # over valid labels N.B. the model only calculates loss + # over trg_len - 1 labels, because it internally shifts the + # labels to the left by 1. + neg_log_likelihood = outputs.loss + nlls.append(neg_log_likelihood) prev_end_loc = end_loc - if verbose: - if dataset.get_type() == "text_only": - print( - f"Evaluating negative log likelihood:" - f" {sample_idx + 1} / {num_samples} Complete," - f" current nll: {current_full_nll}" - ) - elif dataset.get_type() == "text2text": - print( - f"Evaluating negative log likelihood:" - f" {sample_idx + 1} / {num_samples} Complete," - f" current full nll / input nll / output nll:" - f" {current_full_nll} /" - f" {current_full_nll - current_output_nll} /" - f" {current_output_nll}" - ) - else: - raise NotImplementedError( - "f{dataset.get_type()} typed datasets are not" - " supported" - ) - + print( + f"Evaluating negative log likelihood:" + f" {sample_idx + 1} / {num_samples} Complete, current nll:" + f" {torch.stack(nlls).sum() / (sample_idx + 1)}" + ) if end_loc == seq_len: break diff --git a/src/lmflow/pipeline/evaluator_tmp.py b/src/lmflow/pipeline/evaluator_tmp.py new file mode 100644 index 000000000..b8b2e0a78 --- /dev/null +++ b/src/lmflow/pipeline/evaluator_tmp.py @@ -0,0 +1,437 @@ +"""The Evaluator class simplifies the process of running evaluation on a language model provided by a HFDecoderModel instance imported from the LMFlow-main package. The class constructor takes three dictionaries as arguments: model_args containing arguments related to the language model, data_args containing arguments related to the data used for evaluation, and evaluator_args containing other arguments for the evaluation process. + +The class has two methods: create_dataloader() that loads the data from the test file, creates a data loader, and returns it with the size of the data, and evaluate(model) that generates output text given input text. It uses the create_dataloader() method to load the data, iterates over the data in mini-batches, and encodes the input text with the encode() method of the HFDecoderModel class. Then, it generates output text using the evaluate() method of the HFDecoderModel class, decodes the generated output text using the decode() method of the HFDecoderModel class, and writes the output to a file in the output directory. The method also logs some information to the console and Weights and Biases if the use_wandb argument is True. +""" +import os +import deepspeed +import torch +import wandb +import sys +import numpy as np +import datetime +import json +from rouge_score import rouge_scorer +from multiprocessing import Pool +# TODO: remove later +from transformers import AutoConfig +import torch.distributed as dist + +from lmflow.datasets.dataset import Dataset +from lmflow.pipeline.base_pipeline import BasePipeline +from lmflow.models.hf_decoder_model import HFDecoderModel +from lmflow.utils.data_utils import set_random_seed, batchlize, answer_extraction +os.environ["TOKENIZERS_PARALLELISM"] = "false" # To avoid warnings about parallelism in tokenizers + +class Evaluator(BasePipeline): + """ + Initializes the `Evaluator` class with given arguments. + + Parameters + ------------ + model_args : ModelArguments object. + Contains the arguments required to load the model. + + data_args : DatasetArguments object. + Contains the arguments required to load the dataset. + + evaluator_args : EvaluatorArguments object. + Contains the arguments required to perform evaluation. + + + """ + def __init__(self, model_args, data_args, evaluator_args): + # our method + self.data_args = data_args + self.evaluator_args = evaluator_args + self.model_args = model_args + print("--------Begin Evaluator Arguments----------") + print(f"model_args : {self.model_args}") + print(f"data_args : {self.data_args}") + print(f"evaluator_args : {self.evaluator_args}") + print("--------End Evaluator Arguments----------") + # logger + if(self.evaluator_args.use_wandb == True): + wandb.init(project="lmflow_evaluation") + # random seed + set_random_seed(self.evaluator_args.random_seed) + self.local_rank = int(os.getenv("LOCAL_RANK", "0")) + self.world_size = int(os.getenv("WORLD_SIZE", "1")) + torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error + deepspeed.init_distributed() + + self.config = AutoConfig.from_pretrained(model_args.model_name_or_path) + try: + self.model_hidden_size = self.config.hidden_size + except: + print("Error in setting hidden size, use the default size 1024") + self.model_hidden_size = 1024 # gpt2 seems do not have hidden_size in config + + print(f"model_hidden_size = {self.model_hidden_size}") + # batch size has to be divisible by world_size, but can be bigger than world_size + train_batch_size = 1 * self.world_size + self.evaluator_args.minibatch_size = train_batch_size + self.block_size = evaluator_args.evaluate_block_size + # dataloader, data_size = create_dataloader(args) # load dataset + + + def create_dataloader(self, dataset: Dataset): + data_dict = dataset.to_dict() + inputs = [ instance["input"] for instance in data_dict["instances"] ] + outputs = [ instance["output"] for instance in data_dict["instances"] ] + dataset_size = len(outputs) + dataset_buf = [] + for idx in range(dataset_size): + dataset_buf.append({ + "input": inputs[idx], + "output": outputs[idx], + "input_idx": idx + }) + + dataloader = batchlize( + dataset_buf, + self.evaluator_args.minibatch_size, + self.evaluator_args.random_shuffle + ) + print(f"Successfully create dataloader with size {len(dataloader)}.") + return dataloader, dataset_size + + + # TODO: Split for better unittest + def _calculate_rouge_l(self, predicted_answer, groundtruth, scorer: rouge_scorer.RougeScorer, answer_type=None): + case_insensitive_types = [ + "strategyqa", + "coin_flip", + "pubmedqa", + "binary_choice", + "medmcqa", + "usmle", + ] + if answer_type in case_insensitive_types: + rouge_score = scorer.score(groundtruth.lower(), predicted_answer.lower())["rougeL"].fmeasure + else: + rouge_score = scorer.score(groundtruth, predicted_answer)["rougeL"].fmeasure + return rouge_score + + + def _match(self, predicted_answer, groundtruth, answer_type=None): + case_insensitive_types = [ + "strategyqa", + "coin_flip", + "pubmedqa", + "binary_choice", + "medmcqa", + "usmle", + ] + if answer_type in case_insensitive_types: + return predicted_answer.lower() == groundtruth.lower() + else: + return predicted_answer == groundtruth + return False + + + def evaluate(self, model, dataset: Dataset, metric = "accuracy"): + """ + Perform Evaluation for a model + + Parameters + ------------ + model : TunableModel object. + TunableModel to perform inference + + dataset : Dataset object. + + + """ + if metric in ["acc", "accuracy"]: + dataloader, data_size = self.create_dataloader(dataset) # data_size = number of mini-batches + + if not dist.is_initialized() or dist.get_rank() == 0: + if not os.path.exists(self.evaluator_args.output_dir): + os.makedirs(self.evaluator_args.output_dir) + output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") + + acc_list = [] + total = 0 + # ds_engine = deepspeed.initialize(model=model.get_model(), config_params=self.ds_config)[0] + # ds_engine.module.eval() + for batch_index, batch in enumerate(dataloader): + if batch_index * self.world_size >= self.data_args.max_eval_samples: + break + if self.local_rank >= len(batch): + current_batch = batch[0] + else: + # the batch in current process + current_batch = batch[self.local_rank] + + prompt_structure = self.evaluator_args.prompt_structure + input = prompt_structure.format(input=current_batch['input']) + output = current_batch['output'] + input_idx = current_batch['input_idx'] + + inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) + + + # with torch.no_grad(): + # outputs = ds_engine.module.generate(inputs, synced_gpus=True, pad_token_id=model.get_tokenizer().eos_token_id, min_length=5, max_length=100,temperature=0.0, do_sample=False) + outputs = model.inference(inputs, max_new_tokens=100, temperature=0.0) + text_out = model.decode(outputs[0], skip_special_tokens=True) + + # # only return the generation, truncating the input + prompt_length = len(model.decode(inputs[0], skip_special_tokens=True,)) + text_out = text_out[prompt_length:] + answer_type = self.evaluator_args.answer_type + pred_answer = answer_extraction( + text_out, + answer_type=answer_type, + ) + print(f"batch_index{batch_index} rank{self.local_rank}:\n question={input}\n prediction={text_out}\n") + print(f"predicted answer: {pred_answer} \n") + print(f"groundtruth answer: {output} \n") + + if self.local_rank >= len(batch): # for last batch, the padding examples are ignored and do not contribute to the accuracy + correct_ = 0 + total_ = 0 + else: + correct_ = 0 + total_ = 1 + if self._match(pred_answer, output, answer_type): + correct_ = 1 + + # collect accuracy from all gpus + all_process = torch.tensor([correct_, total_], dtype=torch.float32, device=self.local_rank) + dist.all_reduce(all_process, dist.ReduceOp.SUM, async_op=False) + correct_, total_ = all_process.tolist() + avg = correct_ / total_ + acc_list.append(avg) + total += total_ + + # collect predictions from all gpus + output_dict = {"question": input, + "prediction": text_out, + "pred_answer": pred_answer, + "answer": output} + all_process_list = [{}] * self.world_size + + dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, dst=0) # 只收集process 0?? + if not dist.is_initialized() or dist.get_rank() == 0: + current_accuracy = np.mean(acc_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "{}/ {} has been finished, current accuracy = {}".format(int(total), data_size, current_accuracy)) + + if(self.evaluator_args.use_wandb == True): + wandb.log({"Accuracy": current_accuracy}) + + for index, output in enumerate(all_process_list): + output_json = json.dumps(output) + output_writer.write(output_json + '\n') + + if not dist.is_initialized() or dist.get_rank() == 0: # 此刻已经处理完dataset + current_accuracy = np.mean(acc_list) + print("Final accuracy = ", current_accuracy) + output_writer.close() + elif metric in ["ppl", "perplexity"]: + ppl = self._evaluate_ppl(model, dataset) + print(f"Evaluating final ppl: {ppl}") + elif metric in ["nll", "neg_log_likelihood"]: + neg_log_likelihood = self._evaluate_neg_log_likelihood(model, dataset) + print(f"Evaluating final negative log likelihood: {neg_log_likelihood}") + elif metric in ["rl", "rouge-l", "ROUGE-L"]: + rl = self._evaluate_rouge_l(model, dataset) + print(f"Evaluating final ROUGE-L: {rl}") + else: + raise NotImplementedError(f"{metric} is not implemented or not match with our defined metrics") + + + def _evaluate_ppl(self, model, dataset: Dataset): + data_dict = dataset.to_dict() + if data_dict['type'] == 'text2text': + raise NotImplementedError("ppl evaluation is currently not supported for text2text dataset, please use text_only dataset.") + texts = [ instance["text"] for instance in data_dict["instances"] ] + encodings = model.get_tokenizer()("\n\n".join(texts), return_tensors="pt") # seems no need for rouge-L + # Define some constant + try: + max_length = min(model.get_backend_model().config.n_positions, model.get_max_length()) + except: + max_length = min(1024, model.get_max_length()) + + print(f"The maximum sequence length : {max_length}") + seq_len = encodings.input_ids.size(1) + + nlls = [] + prev_end_loc = 0 + for begin_loc in range(0, seq_len, self.block_size): + end_loc = min(begin_loc + max_length, seq_len) + trg_len = end_loc - prev_end_loc # may be different from block_size on last loop + input_ids = encodings.input_ids[:, begin_loc:end_loc].to(device=self.local_rank) + target_ids = input_ids.clone() + target_ids[:, :-trg_len] = -100 + + with torch.no_grad(): + outputs = model.get_backend_model()(input_ids, labels=target_ids) + # loss is calculated using CrossEntropyLoss which averages over valid labels + # N.B. the model only calculates loss over trg_len - 1 labels, because it internally shifts the labels + # to the left by 1. + neg_log_likelihood = outputs.loss + + nlls.append(neg_log_likelihood) + prev_end_loc = end_loc + print(f"Evaluating PPL: {int(begin_loc/self.block_size) + 1} / {int(seq_len/self.block_size)} Complete, current ppl : {torch.exp(torch.stack(nlls).mean())}") + if end_loc == seq_len: + break + ppl = torch.exp(torch.stack(nlls).mean()) + return ppl + + # Added for ROUGE-L evaluation + def _evaluate_rouge_l(self, model, dataset: Dataset): # alpaca dataset: [{instruction: "...", input: "...", output: "...", text: "..."}, ....] + dataloader, data_size = self.create_dataloader(dataset) + + if not dist.is_initialized() or dist.get_rank() == 0: + if not os.path.exists(self.evaluator_args.output_dir): + os.makedirs(self.evaluator_args.output_dir) + output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") + + rl_list = [] + total = 0 + for batch_index, batch in enumerate(dataloader): + if batch_index * self.world_size >= self.data_args.max_eval_samples: + break + if self.local_rank >= len(batch): + current_batch = batch[0] + else: + # the batch in current process + current_batch = batch[self.local_rank] + + prompt_structure = self.evaluator_args.prompt_structure + input = prompt_structure.format(input=current_batch['input']) + output = current_batch['output'] + input_idx = current_batch['input_idx'] + + inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) + outputs = model.inference(inputs, max_new_tokens=100, temperature=0.0) + text_out = model.decode(outputs[0], skip_special_tokens=True) + + # only return the generation, truncating the input + prompt_length = len(model.decode(inputs[0], skip_special_tokens=True, )) + text_out = text_out[prompt_length:] + answer_type = self.evaluator_args.answer_type + pred_answer = answer_extraction( + text_out, + answer_type=answer_type, + ) + print(f"batch_index{batch_index} rank{self.local_rank}:\n question={input}\n prediction={text_out}\n") + print(f"predicted answer: {pred_answer} \n") + print(f"groundtruth answer: {output} \n") + + scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=False) # stemmer: stem the words to their root form + + if self.local_rank >= len( + batch): # for last batch, the padding examples are ignored and do not contribute to the ROUGE-L + rl_ = 0 + total_ = 0 + else: + rl_ = max(0, self._calculate_rouge_l(pred_answer, output, scorer, answer_type)) + total_ = 1 + + # collect rouge-l from all gpus + all_process = torch.tensor([rl_, total_], dtype=torch.float32, device=self.local_rank) + dist.all_reduce(all_process, dist.ReduceOp.SUM, async_op=False) + rl_sum, total_ = all_process.tolist() + avg = rl_sum / total_ + rl_list.append(avg) + total += total_ + + # collect predictions from all gpus + output_dict = {"question": input, + "prediction": text_out, + "pred_answer": pred_answer, + "answer": output} + all_process_list = [{}] * self.world_size + + dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, dst=0) # 只收集process 0?? + if not dist.is_initialized() or dist.get_rank() == 0: + current_rouge_l = np.mean(rl_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "{}/ {} has been finished, current ROUGE-L = {}".format(int(total), data_size, current_rouge_l)) + + if (self.evaluator_args.use_wandb == True): + wandb.log({"rougeL_fmeasure": current_rouge_l}) + + for index, output in enumerate(all_process_list): + output_json = json.dumps(output) + output_writer.write(output_json + '\n') + + if not dist.is_initialized() or dist.get_rank() == 0: # 此刻已经处理完dataset + current_rouge_l = np.mean(rl_list) + print("Final ROUGE-L = ", current_rouge_l) + output_writer.close() + return current_rouge_l + + + def _evaluate_neg_log_likelihood(self, model, dataset: Dataset): + """ + Evaluates negative log likelihood of the model over a dataset. + + NLL = -1/N sum_{i=1}^N sum_{j=1}^|w_i| ln(p(w_{i,j}|context_window)), + + where N is the number of data samples, w_{i,j} is the j-th token in + i-th sample. Here "context_window" = p(w_{i,start}, w_{i,start+1}, ..., + p_{i,j-1} with start = max(0, j - window_length + 1). "window_length" + is normally the maximum length accepted by the model. + + Returns: + A float which represents the negative log likelihood. + """ + data_dict = dataset.to_dict() + if data_dict['type'] == 'text2text': + raise NotImplementedError( + "negative log likelihood evaluation is currently not supported" + " for text2text dataset, please use text_only dataset." + ) + texts = [ instance["text"] for instance in data_dict["instances"] ] + encoding_list = [ model.get_tokenizer()(text, return_tensors="pt") + for text in texts ] + + # Gets context window length + try: + max_length = min(model.get_backend_model().config.n_positions, + model.get_max_length()) + except: + max_length = min(1024, model.get_max_length()) + + nlls = [] + num_samples = len(texts) + for sample_idx, encodings in enumerate(encoding_list): + seq_len = encodings.input_ids.size(1) + + prev_end_loc = 0 + for begin_loc in range(0, seq_len, self.block_size): + end_loc = min(begin_loc + max_length, seq_len) + + # may be different from block_size on last loop + trg_len = end_loc - prev_end_loc + input_ids = encodings.input_ids[:, begin_loc:end_loc] + input_ids = input_ids.to(device=self.local_rank) + + target_ids = input_ids.clone() + target_ids[:, :-trg_len] = -100 + + with torch.no_grad(): + outputs = model.get_backend_model()(input_ids, + labels=target_ids) + # loss is calculated using CrossEntropyLoss which averages + # over valid labels N.B. the model only calculates loss + # over trg_len - 1 labels, because it internally shifts the + # labels to the left by 1. + neg_log_likelihood = outputs.loss + + nlls.append(neg_log_likelihood) + prev_end_loc = end_loc + print( + f"Evaluating negative log likelihood:" + f" {sample_idx + 1} / {num_samples} Complete, current nll:" + f" {torch.stack(nlls).sum() / (sample_idx + 1)}" + ) + if end_loc == seq_len: + break + + mean_nll = torch.stack(nlls).sum() / num_samples + return mean_nll diff --git a/src/lmflow/pipeline/test_rougel.py b/src/lmflow/pipeline/test_rougel.py new file mode 100644 index 000000000..e01f57085 --- /dev/null +++ b/src/lmflow/pipeline/test_rougel.py @@ -0,0 +1,292 @@ +import os +import deepspeed +import torch +import wandb +import sys +import numpy as np +import datetime +import json +from rouge_score import rouge_scorer +from multiprocessing import Pool +from functools import partial +from transformers import AutoConfig +# from lmflow.pipeline.auto_pipeline import AutoPipeline + +import torch.distributed as dist +from transformers import HfArgumentParser +from lmflow.datasets.dataset import Dataset +from lmflow.pipeline.base_pipeline import BasePipeline +from lmflow.utils.data_utils import set_random_seed, batchlize, answer_extraction, load_data +from lmflow.models.auto_model import AutoModel +from lmflow.args import ModelArguments, DatasetArguments, AutoArguments + +#copied from evaluator.py +class Test_rougel(BasePipeline): + """ + Initializes the `Test_rougel` class with given arguments. + + Parameters + ------------ + model_args : ModelArguments object. + Contains the arguments required to load the model. + + data_args : DatasetArguments object. + Contains the arguments required to load the dataset. + + evaluator_args : EvaluatorArguments object. + Contains the arguments required to perform evaluation. + + + """ + + def __init__(self, model_args, data_args, evaluator_args): + # our method + self.data_args = data_args + self.evaluator_args = evaluator_args + self.model_args = model_args + print("--------Begin Evaluator Arguments----------") + print(f"model_args : {self.model_args}") + print(f"data_args : {self.data_args}") + print(f"evaluator_args : {self.evaluator_args}") + print("--------End Evaluator Arguments----------") + # logger + if (self.evaluator_args.use_wandb == True): + wandb.init(project="lmflow_evaluation") + # random seed + set_random_seed(self.evaluator_args.random_seed) + self.local_rank = int(os.getenv("LOCAL_RANK", "0")) + self.world_size = int(os.getenv("WORLD_SIZE", "1")) + print("\nself.world_size是:", self.world_size, "\n") + torch.cuda.set_device(self.local_rank) # NOTE: cpu-only machine will have error + deepspeed.init_distributed() + + self.config = AutoConfig.from_pretrained(model_args.model_name_or_path) + try: + self.model_hidden_size = self.config.hidden_size + except: + print("Error in setting hidden size, use the default size 1024") + self.model_hidden_size = 1024 # gpt2 seems do not have hidden_size in config + + print(f"model_hidden_size = {self.model_hidden_size}") + # batch size has to be divisible by world_size, but can be bigger than world_size + train_batch_size = 1 * self.world_size + self.evaluator_args.minibatch_size = train_batch_size + self.block_size = evaluator_args.evaluate_block_size + # dataloader, data_size = create_dataloader(args) # load dataset + + # First use the method in self-instruct to get the ROUGE-L scores for the dataset, then use the method in LMFlow and compare the two scores, + # The metric is tested to be valid if all scores are the same. + def get_rougel_score_list(self, predicted_data_path: str): + scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=False) + + + with open(predicted_data_path, encoding='utf-8') as f: + contents = f.read() + objects = contents.strip().split('\n') + json_array = '[' + ','.join(objects) + ']' + json_data = json.loads(json_array) + + pred_answers = [instance["pred_answer"] for instance in json_data] + answers = [instance["answer"] for instance in json_data] + dataset_size = len(answers) + + pred_dataset_buf = [] + for idx in range(dataset_size): + pred_dataset_buf.append({ + "input": pred_answers[idx], + "output": answers[idx], + "idx": idx + }) + + dataloader = batchlize( + pred_dataset_buf, + self.evaluator_args.minibatch_size, # = self.world_size + self.evaluator_args.random_shuffle + ) + print(f"Successfully create dataloader of predictions and answers with size {len(dataloader)}.") + + score_list = [] # store the maximum ROUGE-L score in each batch + + for batch in dataloader: + input_ = [data["input"] for data in batch] + output_ = [data["output"] for data in batch] + with Pool(4) as p: # 4 processes + scores = [] + for idx in range(len(input_)): + rouge_scores = p.map(partial(scorer.score, input_[idx]), [output_[idx]]) + rouge_scores = [score["rougeL"].fmeasure for score in rouge_scores] + max_rl_score = max(rouge_scores) + scores.append(max_rl_score) + score_list.append(max(scores)) + + return score_list + + + def create_dataloader(self, dataset: Dataset): + data_dict = dataset.to_dict() + inputs = [instance["input"] for instance in data_dict["instances"]] + outputs = [instance["output"] for instance in data_dict["instances"]] + dataset_size = len(outputs) + dataset_buf = [] + for idx in range(dataset_size): + dataset_buf.append({ + "input": inputs[idx], + "output": outputs[idx], + "input_idx": idx + }) + if idx == 12: # to be removed later + break + + dataloader = batchlize( + dataset_buf, + self.evaluator_args.minibatch_size, # = self.world_size + self.evaluator_args.random_shuffle + ) + print(f"Successfully create dataloader with size {len(dataloader)}.") + return dataloader, dataset_size + + # TODO: Split for better unittest + def _calculate_rouge_l(self, predicted_answer, groundtruth, scorer: rouge_scorer.RougeScorer, answer_type=None): + case_insensitive_types = [ + "strategyqa", + "coin_flip", + "pubmedqa", + "binary_choice", + "medmcqa", + "usmle", + ] + if answer_type in case_insensitive_types: + rouge_score = scorer.score(groundtruth.lower(), predicted_answer.lower())["rougeL"].fmeasure + else: + rouge_score = scorer.score(groundtruth, predicted_answer)["rougeL"].fmeasure + return rouge_score + + def evaluate(self, model, dataset: Dataset, metric="rougel"): + """ + Perform Evaluation for a model + + Parameters + ------------ + model : TunableModel object. + TunableModel to perform inference + + dataset : Dataset object. + + + """ + if metric in ["rl", "rouge-l", "ROUGE-L"]: + dataloader, data_size = self.create_dataloader(dataset) # data_size = number of mini-batches + + if not dist.is_initialized() or dist.get_rank() == 0: + if not os.path.exists(self.evaluator_args.output_dir): + os.makedirs(self.evaluator_args.output_dir) + output_writer = open(f"{self.evaluator_args.output_dir}/evaluation.json", "w") # ./output_dir/evaluation.json + + pred_score_list = [] # list to record the ROUGE-L scores of all batches from LMFlow method + total = 0 + # ds_engine = deepspeed.initialize(model=model.get_model(), config_params=self.ds_config)[0] + # ds_engine.module.eval() + for batch_index, batch in enumerate(dataloader): + if batch_index * self.world_size >= self.data_args.max_eval_samples: + break + if self.local_rank >= len(batch): + current_batch = batch[0] + else: + # the batch in current process + current_batch = batch[self.local_rank] + + prompt_structure = self.evaluator_args.prompt_structure + input = prompt_structure.format(input=current_batch['input']) + output = current_batch['output'] + input_idx = current_batch['input_idx'] + + inputs = model.encode(input, return_tensors="pt").to(device=self.local_rank) + + # with torch.no_grad(): + # outputs = ds_engine.module.generate(inputs, synced_gpus=True, pad_token_id=model.get_tokenizer().eos_token_id, min_length=5, max_length=100,temperature=0.0, do_sample=False) + outputs = model.inference(inputs, max_new_tokens=100, temperature=0.0) + text_out = model.decode(outputs[0], skip_special_tokens=True) + + # # only return the generation, truncating the input + prompt_length = len(model.decode(inputs[0], skip_special_tokens=True, )) + text_out = text_out[prompt_length:] + answer_type = self.evaluator_args.answer_type + pred_answer = answer_extraction( + text_out, + answer_type=answer_type, + ) + print( + f"batch_index{batch_index} rank{self.local_rank}:\n question={input}\n prediction={text_out}\n") + print(f"predicted answer: {pred_answer} \n") + print(f"groundtruth answer: {output} \n") + + + scorer = rouge_scorer.RougeScorer(["rougeL"], + use_stemmer=False) # stemmer: stem the words to their root form + + if self.local_rank >= len( + batch): # for last batch, the padding examples are ignored and do not contribute to the ROUGE-L + rl_ = 0 + total_ = 0 + else: + rl_ = max(0, self._calculate_rouge_l(pred_answer, output, scorer, answer_type)) + total_ = 1 + score = rl_ + + # collect rouge-l from all gpus + all_process = torch.tensor([rl_, total_], dtype=torch.float32, device=self.local_rank) + dist.all_reduce(all_process, dist.ReduceOp.MAX, async_op=False) + max_, total_ = all_process.tolist() + print("max_: ", max_) + print("total_: ", total_) + # avg = max_ / total_ + avg = max_ + pred_score_list.append(avg) + total += total_ + + # collect predictions from all gpus + output_dict = {"question": input, + "prediction": text_out, + "pred_answer": pred_answer, + "answer": output} + all_process_list = [{}] * self.world_size + + dist.gather_object(output_dict, all_process_list if dist.get_rank() == 0 else None, + dst=0) + print("all_process_list: ", all_process_list) + if not dist.is_initialized() or dist.get_rank() == 0: + current_rouge_l = np.mean(pred_score_list) + print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "{}/ {} has been finished, current ROUGE-L = {}".format(int(total), data_size, + current_rouge_l)) + + if (self.evaluator_args.use_wandb == True): + wandb.log({"rougeL_fmeasure": current_rouge_l}) + + for index, output in enumerate(all_process_list): + output_json = json.dumps(output) + output_writer.write(output_json + '\n') + + if not dist.is_initialized() or dist.get_rank() == 0: + current_rouge_l = np.mean(pred_score_list) + print("Final ROUGE-L = ", current_rouge_l) + output_writer.close() + print("__________________________\n__________________________") + + else: + raise NotImplementedError(f"{metric} is not implemented or not match with our defined metrics") + + # load the dataset with predicted answers and apply the self-instruct method to get the answer score list. + ans_score_list = self.get_rougel_score_list(f"{self.evaluator_args.output_dir}/evaluation.json") + + # Start compare the two ROUGE-L scores lists we get + matched = True + for pred, ans in zip(pred_score_list, ans_score_list): + print("LMFlow ROUGE-L: ", pred, " -- self-instruct ROUGE-L: ", ans) + if round(pred, 5) != round(ans, 5): + matched = False + print("scores not matched!") + return + print("scores matched. Tested to be valid.") + +