forked from intel-analytics/ipex-llm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
llama3.py
129 lines (114 loc) · 5.08 KB
/
llama3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import torch
import time
import argparse
from ipex_llm.transformers.npu_model import AutoModelForCausalLM
from transformers import AutoTokenizer, TextStreamer
from transformers.utils import logging
logger = logging.get_logger(__name__)
def get_prompt(user_input: str, chat_history: list[tuple[str, str]],
system_prompt: str) -> str:
prompt_texts = [f'<|begin_of_text|>']
if system_prompt != '':
prompt_texts.append(f'<|start_header_id|>system<|end_header_id|>\n\n{system_prompt}<|eot_id|>')
for history_input, history_response in chat_history:
prompt_texts.append(f'<|start_header_id|>user<|end_header_id|>\n\n{history_input.strip()}<|eot_id|>')
prompt_texts.append(f'<|start_header_id|>assistant<|end_header_id|>\n\n{history_response.strip()}<|eot_id|>')
prompt_texts.append(f'<|start_header_id|>user<|end_header_id|>\n\n{user_input.strip()}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n')
return ''.join(prompt_texts)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Predict Tokens using `generate()` API for npu model"
)
parser.add_argument(
"--repo-id-or-model-path",
type=str,
default="meta-llama/Meta-Llama-3-8B-Instruct",
help="The huggingface repo id for the Llama3 model to be downloaded"
", or the path to the huggingface checkpoint folder",
)
parser.add_argument('--prompt', type=str, default="What is AI?",
help='Prompt to infer')
parser.add_argument("--n-predict", type=int, default=32, help="Max tokens to predict")
parser.add_argument("--max-context-len", type=int, default=1024)
parser.add_argument("--max-prompt-len", type=int, default=512)
parser.add_argument("--disable-transpose-value-cache", action="store_true", default=False)
parser.add_argument("--disable-streaming", action="store_true", default=False)
parser.add_argument("--save-directory", type=str,
required=True,
help="The path of folder to save converted model, "
"If path not exists, lowbit model will be saved there. "
"Else, lowbit model will be loaded.",
)
args = parser.parse_args()
model_path = args.repo_id_or_model_path
if not os.path.exists(args.save_directory):
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
trust_remote_code=True,
attn_implementation="eager",
load_in_low_bit="sym_int4",
optimize_model=True,
max_context_len=args.max_context_len,
max_prompt_len=args.max_prompt_len,
transpose_value_cache=not args.disable_transpose_value_cache,
save_directory=args.save_directory
)
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
tokenizer.save_pretrained(args.save_directory)
else:
model = AutoModelForCausalLM.load_low_bit(
args.save_directory,
attn_implementation="eager",
torch_dtype=torch.float16,
optimize_model=True,
max_context_len=args.max_context_len,
max_prompt_len=args.max_prompt_len,
transpose_value_cache=not args.disable_transpose_value_cache,
)
tokenizer = AutoTokenizer.from_pretrained(args.save_directory, trust_remote_code=True)
if args.disable_streaming:
streamer = None
else:
streamer = TextStreamer(tokenizer=tokenizer, skip_special_tokens=True)
DEFAULT_SYSTEM_PROMPT = """\
"""
print("-" * 80)
print("done")
with torch.inference_mode():
print("finish to load")
for i in range(5):
prompt = get_prompt(args.prompt, [], system_prompt=DEFAULT_SYSTEM_PROMPT)
_input_ids = tokenizer.encode(prompt, return_tensors="pt")
print("-" * 20, "Input", "-" * 20)
print("input length:", len(_input_ids[0]))
print(prompt)
print("-" * 20, "Output", "-" * 20)
st = time.time()
output = model.generate(
_input_ids, num_beams=1, do_sample=False, max_new_tokens=args.n_predict, streamer=streamer
)
end = time.time()
if args.disable_streaming:
output_str = tokenizer.decode(output[0], skip_special_tokens=False)
print(output_str)
print(f"Inference time: {end-st} s")
print("-" * 80)
print("done")
print("success shut down")